You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:40 UTC

[33/50] [abbrv] incubator-omid git commit: Simplify Batch(Pool) initialization and fix tests

Simplify Batch(Pool) initialization and fix tests

Also improve naming of variables and methods that have now different semantics

Change-Id: I262b0f5ad2c2dc47a02f41f049c5075c8b6a357e


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/1d60f21d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/1d60f21d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/1d60f21d

Branch: refs/heads/master
Commit: 1d60f21df8d239412939f0692f398e1c01294568
Parents: 43261a3
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Tue May 3 11:16:17 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 4 09:50:57 2016 -0700

----------------------------------------------------------------------
 .../apache/omid/transaction/TestTSOModule.java  |   4 +-
 .../TSOForHBaseCompactorTestModule.java         |   2 +-
 .../main/java/org/apache/omid/tso/Batch.java    |  45 ++-
 .../java/org/apache/omid/tso/BatchPool.java     |  11 +-
 .../java/org/apache/omid/tso/PersistEvent.java  |  12 +
 .../apache/omid/tso/PersistenceProcessor.java   |  16 +-
 .../omid/tso/PersistenceProcessorHandler.java   |  10 +-
 .../omid/tso/PersistenceProcessorImpl.java      |  79 ++--
 .../org/apache/omid/tso/ReplyProcessor.java     |  23 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java |  10 +-
 .../apache/omid/tso/RequestProcessorImpl.java   |  16 +-
 .../java/org/apache/omid/tso/TSOModule.java     |   4 +-
 .../org/apache/omid/tso/TSOServerConfig.java    |  74 ++--
 tso-server/src/main/resources/default-omid.yml  |  14 +-
 .../java/org/apache/omid/tso/TSOMockModule.java |   4 +-
 .../java/org/apache/omid/tso/TestBatch.java     |  22 +-
 .../java/org/apache/omid/tso/TestPanicker.java  |  14 +-
 .../omid/tso/TestPersistenceProcessor.java      | 382 +++++++++++--------
 .../apache/omid/tso/TestRequestProcessor.java   |  32 +-
 .../org/apache/omid/tso/TestRetryProcessor.java |  41 +-
 ...stTSOClientRequestAndResponseBehaviours.java |   1 +
 tso-server/src/test/resources/test-omid.yml     |   2 +-
 22 files changed, 436 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 5d84b8b..79b84de 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.transaction;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
@@ -55,6 +56,7 @@ class TestTSOModule extends AbstractModule {
     private final TSOServerConfig config;
 
     TestTSOModule(Configuration hBaseConfig, TSOServerConfig config) {
+        Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
         this.hBaseConfig = hBaseConfig;
         this.config = config;
     }
@@ -104,7 +106,7 @@ class TestTSOModule extends AbstractModule {
 
     @Provides
     PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
-        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
             persistenceProcessorHandlers[i] = provider.get();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index 96afbd2..7c7e643 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -123,7 +123,7 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
 
     @Provides
     PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
-        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
             persistenceProcessorHandlers[i] = provider.get();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index e778eee..2b17f23 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -17,32 +17,29 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import org.apache.omid.tso.PersistEvent.Type;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+
 public class Batch {
 
     private static final Logger LOG = LoggerFactory.getLogger(Batch.class);
 
-    private final PersistEvent[] events;
-    private final int maxBatchSize;
-    private final BatchPool batchPool;
     private final int id;
+    private final int size;
     private int numEvents;
-
-    Batch(int maxBatchSize) {
-
-        this(maxBatchSize, 0, null);
-
-    }
+    private final PersistEvent[] events;
+    private final BatchPool batchPool;
 
     Batch(int size, int id, BatchPool batchPool) {
+
         Preconditions.checkArgument(size > 0, "Size must be positive");
-        LOG.info("Batch id {} created with size {}", id, size);
-        this.maxBatchSize = size;
+        this.size = size;
         this.batchPool = batchPool;
         this.id = id;
         this.numEvents = 0;
@@ -50,12 +47,14 @@ public class Batch {
         for (int i = 0; i < size; i++) {
             this.events[i] = new PersistEvent();
         }
+        LOG.info("Batch id {} created with size {}", id, size);
 
     }
 
     boolean isFull() {
-        Preconditions.checkState(numEvents <= maxBatchSize, "numEvents > maxBatchSize");
-        return numEvents == maxBatchSize;
+
+        Preconditions.checkState(numEvents <= size, "numEvents > size");
+        return numEvents == size;
 
     }
 
@@ -66,8 +65,9 @@ public class Batch {
     }
 
     boolean isLastEntryEmpty() {
-        Preconditions.checkState(numEvents <= maxBatchSize, "numEvents > maxBatchSize");
-        return numEvents == (maxBatchSize - 1);
+
+        Preconditions.checkState(numEvents <= size, "numEvents > size");
+        return numEvents == (size - 1);
 
     }
 
@@ -124,6 +124,7 @@ public class Batch {
     }
 
     void sendReply(ReplyProcessor reply, RetryProcessor retryProc, long batchID) {
+
         int i = 0;
         while (i < numEvents) {
             PersistEvent e = events[i];
@@ -135,7 +136,7 @@ public class Batch {
                 events[numEvents - 1] = tmp;
                 if (numEvents == 1) {
                     clear();
-                    reply.batchResponse(null, batchID);
+                    reply.manageResponsesBatch(batchID, null);
                     return;
                 }
                 numEvents--;
@@ -144,8 +145,18 @@ public class Batch {
             i++;
         }
 
-        reply.batchResponse(this, batchID);
+        reply.manageResponsesBatch(batchID, this);
+
+    }
 
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("id", id)
+                .add("size", size)
+                .add("num events", numEvents)
+                .add("events", Arrays.toString(events))
+                .toString();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java b/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
index 88cfd1e..a5d7759 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
@@ -34,13 +34,12 @@ public class BatchPool {
     @Inject
     public BatchPool(TSOServerConfig config) {
 
-        int numBuffersPerHandler = (config.getNumBuffersPerHandler() >= 2) ? config.getNumBuffersPerHandler() : 2;
-        poolSize = config.getPersistHandlerNum() * numBuffersPerHandler;
-        LOG.info("Config param maxBatchSize {}", config.getMaxBatchSize());
-        LOG.info("Persistent Handlers {} ; Buffers per handler {}", config.getPersistHandlerNum(), numBuffersPerHandler);
+        poolSize = config.getNumConcurrentCTWriters();
+        int batchSize = config.getBatchSizePerCTWriter() + 1; // Add 1 element to batch size for storing LWM
+
+        LOG.info("Pool Size (Batches) {}; Batch Size {} (including LWM bucket)", poolSize, batchSize);
+        LOG.info("Total Batch Size (Pool size * Batch Size): {}", poolSize * batchSize);
         batches = new Batch[poolSize];
-        // TODO The + 1 is because the low watermark is added in each flush and needs one extra position. Fix and clarify initialization
-        int batchSize = (config.getMaxBatchSize() / config.getPersistHandlerNum() > 0) ? (config.getMaxBatchSize() / config.getPersistHandlerNum()) + 1 : 2;
 
         LOG.info("Creating {} Batches with {} elements each", poolSize, batchSize);
         for (int i = 0; i < poolSize; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index e816149..32820f6 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Objects;
 import org.jboss.netty.channel.Channel;
 
 public final class PersistEvent {
@@ -114,4 +115,15 @@ public final class PersistEvent {
 
     }
 
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("type", type)
+                .add("isRetry", isRetry)
+                .add("ST", startTimestamp)
+                .add("CT", commitTimestamp)
+                .add("LWM", lowWatermark)
+                .toString();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 47dc56b..aa0e7df 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -19,24 +19,18 @@ package org.apache.omid.tso;
 
 import org.jboss.netty.channel.Channel;
 
-// TODO Check the names of all methods as they do not persist anything anymore
 interface PersistenceProcessor {
 
-    // TODO maybe it should be called addCommit(...) or addCommitToBatch(...)
-    void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+    void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
             throws InterruptedException;
 
-    // TODO maybe it should be called addAbort(...) or addAbortToBatch(...)
-    void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx)
+    void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx)
             throws InterruptedException;
 
-    // TODO maybe it should be called addTimestamp(...) or addTimestampToBatch(...)
-    void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws InterruptedException;
+    void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws InterruptedException;
 
-    // TODO maybe it should be called addLowWatermark(...) or addLowWatermarkToBatch(...)
-    void persistLowWatermark(long lowWatermark, MonitoringContext monCtx);
+    void addLowWatermarkToBatch(long lowWatermark, MonitoringContext monCtx);
 
-    // TODO The name of this method is weird. Rename to "persist"
-    void persistFlush() throws InterruptedException;
+    void triggerCurrentBatchFlush() throws InterruptedException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 19b6b77..84890b9 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.lmax.disruptor.LifecycleAware;
 import com.lmax.disruptor.WorkHandler;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.Histogram;
@@ -93,14 +94,11 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
                 throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
             }
         }
-        flush(batch, event.getBatchID());
+        flush(batch, event.getBatchSequence());
 
     }
 
-    // TODO Fix this method with the contents of PersistenceProcessor.flush() in master branch
-    // TODO This is related to the changes in TestPersistenceProcessor.testCommitPersistenceWithHALeaseManager().
-    // TODO Check also that test in the master branch
-    private void flush(Batch batch, long batchID) {
+    private void flush(Batch batch, long batchSequence) {
 
         if (batch.getNumEvents() > 0) {
             commitSuicideIfNotMaster();
@@ -113,7 +111,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
                 panicker.panic("Error persisting commit batch", e);
             }
             commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
-            batch.sendReply(replyProcessor, retryProc, batchID);
+            batch.sendReply(replyProcessor, retryProc, batchSequence);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index e57cda3..4d1eb4c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -18,6 +18,7 @@
 package org.apache.omid.tso;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
@@ -33,21 +34,23 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
+import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
+
 class PersistenceProcessorImpl implements PersistenceProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
 
     private static final long INITIAL_LWM_VALUE = -1L;
 
-    private final ReplyProcessor replyProcessor;
     private final RingBuffer<PersistBatchEvent> persistRing;
 
     private final BatchPool batchPool;
     @VisibleForTesting
-    Batch batch;
+    Batch currentBatch;
 
     // TODO Next two need to be either int or AtomicLong
-    volatile private long batchIDCnt;
+    volatile private long batchSequence;
     volatile private long lowWatermark = INITIAL_LWM_VALUE;
 
     private MonitoringContext lowWatermarkContext;
@@ -55,82 +58,80 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     @Inject
     PersistenceProcessorImpl(TSOServerConfig config,
                              BatchPool batchPool,
-                             ReplyProcessor replyProcessor,
                              Panicker panicker,
                              PersistenceProcessorHandler[] handlers)
             throws InterruptedException, ExecutionException, IOException {
 
-        this.batchIDCnt = 0L;
+        this.batchSequence = 0L;
         this.batchPool = batchPool;
-        this.batch = batchPool.getNextEmptyBatch();
+        this.currentBatch = batchPool.getNextEmptyBatch();
 
-        this.replyProcessor = replyProcessor;
+        // Disruptor configuration
+        this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
 
-        this.persistRing = RingBuffer.createSingleProducer(
-                PersistBatchEvent.EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
+        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
+        ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(),
+                                                                   threadFactory.build());
 
         WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
                                                                           persistRing.newBarrier(),
                                                                           new FatalExceptionHandler(panicker),
                                                                           handlers);
         this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
-
-        ExecutorService requestExec = Executors.newFixedThreadPool(config.getPersistHandlerNum(),
-                                                                   new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
         persistProcessor.start(requestExec);
 
     }
 
     @Override
-    public void persistFlush() throws InterruptedException {
+    public void triggerCurrentBatchFlush() throws InterruptedException {
 
-        if (batch.isEmpty()) {
+        if (currentBatch.isEmpty()) {
             return;
         }
-        batch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
+        currentBatch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
         long seq = persistRing.next();
         PersistBatchEvent e = persistRing.get(seq);
-        PersistBatchEvent.makePersistBatch(e, batch, batchIDCnt++);
+        makePersistBatch(e, batchSequence++, currentBatch);
         persistRing.publish(seq);
-        batch = batchPool.getNextEmptyBatch();
+        currentBatch = batchPool.getNextEmptyBatch();
 
     }
 
     @Override
-    public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
             throws InterruptedException {
 
-        batch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
-        if (batch.isLastEntryEmpty()) {
-            persistFlush();
+        currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
+        if (currentBatch.isLastEntryEmpty()) {
+            triggerCurrentBatchFlush();
         }
 
     }
 
     @Override
-    public void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
+    public void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
             throws InterruptedException {
 
-        batch.addAbort(startTimestamp, isRetry, c, context);
-        if (batch.isLastEntryEmpty()) {
-            persistFlush();
+        currentBatch.addAbort(startTimestamp, isRetry, c, context);
+        if (currentBatch.isLastEntryEmpty()) {
+            triggerCurrentBatchFlush();
         }
 
     }
 
     @Override
-    public void persistTimestamp(long startTimestamp, Channel c, MonitoringContext context)
+    public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context)
             throws InterruptedException {
 
-        batch.addTimestamp(startTimestamp, c, context);
-        if (batch.isLastEntryEmpty()) {
-            persistFlush();
+        currentBatch.addTimestamp(startTimestamp, c, context);
+        if (currentBatch.isLastEntryEmpty()) {
+            triggerCurrentBatchFlush();
         }
 
     }
 
     @Override
-    public void persistLowWatermark(long lowWatermark, MonitoringContext context) {
+    public void addLowWatermarkToBatch(long lowWatermark, MonitoringContext context) {
 
         this.lowWatermark = lowWatermark;
         this.lowWatermarkContext = context;
@@ -139,20 +140,20 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
 
     final static class PersistBatchEvent {
 
+        private long batchSequence;
         private Batch batch;
-        private long batchID;
 
-        static void makePersistBatch(PersistBatchEvent e, Batch batch, long batchID) {
+        static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
             e.batch = batch;
-            e.batchID = batchID;
+            e.batchSequence = batchSequence;
         }
 
         Batch getBatch() {
             return batch;
         }
 
-        long getBatchID() {
-            return batchID;
+        long getBatchSequence() {
+            return batchSequence;
         }
 
         final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
@@ -161,6 +162,14 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
             }
         };
 
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this)
+                    .add("batchSequence", batchSequence)
+                    .add("batch", batch)
+                    .toString();
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 37a9dd9..04f40cc 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -21,27 +21,20 @@ import org.jboss.netty.channel.Channel;
 
 interface ReplyProcessor {
 
-    // TODO This documentation does not corresponds to the method below anymore. Put in the right place or remove
     /**
-     * Informs the client about the outcome of the Tx it was trying to commit.
+     * The each reply to a transactional operation for a client is contained in a batch. The batch must be ordered
+     * before sending the replies in order to not to break snapshot isolation properties.
      *
+     * @param batchSequence
+     *            a batch sequence number, used to enforce order between replies
      * @param batch
-     *            the batch of operations
-     * @param batchID
-     *            the id of the batch, used to enforce order between replies
-     * @param makeHeuristicDecision
-     *            informs about whether heuristic actions are needed or not
-     * @param startTimestamp
-     *            the start timestamp of the transaction (a.k.a. tx id)
-     * @param commitTimestamp
-     *            the commit timestamp of the transaction
-     * @param channel
-     *            the communication channed with the client
+     *            a batch containing the transaction operations
      */
-    void batchResponse(Batch batch, long batchID);
+    void manageResponsesBatch(long batchSequence, Batch batch);
 
+    // TODO This method can be removed if we return the responses from the retry processor
     void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context);
-
+    // TODO This method can be removed if we return the responses from the retry processor
     void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 85f465c..0f5fcb4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -165,28 +165,30 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
     }
 
     @Override
-    public void batchResponse(Batch batch, long batchID) {
+    public void manageResponsesBatch(long batchSequence, Batch batch) {
 
         long seq = replyRing.next();
         ReplyBatchEvent e = replyRing.get(seq);
-        ReplyBatchEvent.makeReplyBatch(e, batch, batchID);
+        ReplyBatchEvent.makeReplyBatch(e, batch, batchSequence);
         replyRing.publish(seq);
 
     }
 
+    // TODO This method can be removed if we return the responses from the retry processor
     @Override
     public void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context) {
 
         batch.addAbort(startTimestamp, true, c, context);
-        batchResponse(batch, NO_ORDER);
+        manageResponsesBatch(NO_ORDER, batch);
 
     }
 
+    // TODO This method can be removed if we return the responses from the retry processor
     @Override
     public void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
 
         batch.addCommit(startTimestamp, commitTimestamp, c, context);
-        batchResponse(batch, NO_ORDER);
+        manageResponsesBatch(NO_ORDER, batch);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 2bbf1cf..ba0cb74 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -91,8 +91,8 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
     public void update(TSOState state) throws InterruptedException {
         LOG.info("Initializing RequestProcessor...");
         this.lowWatermark = state.getLowWatermark();
-        persistProc.persistLowWatermark(lowWatermark, new MonitoringContext(metrics));
-        persistProc.persistFlush();
+        persistProc.addLowWatermarkToBatch(lowWatermark, new MonitoringContext(metrics));
+        persistProc.triggerCurrentBatchFlush();
         LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
     }
 
@@ -117,7 +117,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
         }
 
         if (endOfBatch) {
-            persistProc.persistFlush();
+            persistProc.triggerCurrentBatchFlush();
         }
 
     }
@@ -125,7 +125,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
     @Override
     public void onTimeout(long sequence) throws Exception {
 
-        persistProc.persistFlush();
+        persistProc.triggerCurrentBatchFlush();
 
     }
 
@@ -161,7 +161,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             return;
         }
 
-        persistProc.persistTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
+        persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
 
     }
 
@@ -208,15 +208,15 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
                     if (newLowWatermark != lowWatermark) {
                         LOG.trace("Setting new low Watermark to {}", newLowWatermark);
                         lowWatermark = newLowWatermark;
-                        persistProc.persistLowWatermark(newLowWatermark, event.getMonCtx());
+                        persistProc.addLowWatermarkToBatch(newLowWatermark, event.getMonCtx());
                     }
                 }
-                persistProc.persistCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
+                persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
             } catch (IOException e) {
                 LOG.error("Error committing", e);
             }
         } else { // add it to the aborted list
-            persistProc.persistAbort(startTimestamp, isRetry, c, event.getMonCtx());
+            persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
         }
 
         return commitTimestamp;

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index f195ebb..bfa88ef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
@@ -34,6 +35,7 @@ class TSOModule extends AbstractModule {
     private final TSOServerConfig config;
 
     TSOModule(TSOServerConfig config) {
+        Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
         this.config = config;
     }
 
@@ -65,7 +67,7 @@ class TSOModule extends AbstractModule {
 
     @Provides
     PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
-        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
             persistenceProcessorHandlers[i] = provider.get();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index c1fb5d2..05cb3f4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -72,15 +72,11 @@ public class TSOServerConfig extends SecureHBaseConfig {
 
     private int maxItems;
 
-    private int maxBatchSize;
+    private int numConcurrentCTWriters;
 
-    private int batchPersistTimeoutInMs;
-
-    // TODO Rename this (e.g. numOfConcurrentWriters)
-    private int persistHandlerNum;
+    private int batchSizePerCTWriter;
 
-    // TODO Rename this
-    private int numBuffersPerHandler;
+    private int batchPersistTimeoutInMs;
 
     private String networkIfaceName = getDefaultNetworkInterface();
 
@@ -92,8 +88,32 @@ public class TSOServerConfig extends SecureHBaseConfig {
         this.port = port;
     }
 
-    public void setMaxBatchSize(int maxBatchSize) {
-        this.maxBatchSize = maxBatchSize;
+    public int getMaxItems() {
+        return maxItems;
+    }
+
+    public void setMaxItems(int maxItems) {
+        this.maxItems = maxItems;
+    }
+
+    public int getNumConcurrentCTWriters() {
+        return numConcurrentCTWriters;
+    }
+
+    public void setNumConcurrentCTWriters(int numConcurrentCTWriters) {
+        this.numConcurrentCTWriters = numConcurrentCTWriters;
+    }
+
+    public int getBatchSizePerCTWriter() {
+        return batchSizePerCTWriter;
+    }
+
+    public void setBatchSizePerCTWriter(int batchSizePerCTWriter) {
+        this.batchSizePerCTWriter = batchSizePerCTWriter;
+    }
+
+    public int getBatchPersistTimeoutInMs() {
+        return batchPersistTimeoutInMs;
     }
 
     public void setBatchPersistTimeoutInMs(int value) {
@@ -132,22 +152,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
         this.leaseModule = leaseModule;
     }
 
-    public int getMaxItems() {
-        return maxItems;
-    }
-
-    public void setMaxItems(int maxItems) {
-        this.maxItems = maxItems;
-    }
-
-    public int getMaxBatchSize() {
-        return maxBatchSize;
-    }
-
-    public int getBatchPersistTimeoutInMs() {
-        return batchPersistTimeoutInMs;
-    }
-
     public MetricsRegistry getMetrics() {
         return metrics;
     }
@@ -156,26 +160,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
         this.metrics = metrics;
     }
 
-    // TODO Rename this (e.g. numOfConcurrentWriters)
-    public int getPersistHandlerNum() {
-        return persistHandlerNum;
-    }
-
-    // TODO Rename this
-    public void setPersistHandlerNum(int persistHandlerNum) {
-        this.persistHandlerNum = persistHandlerNum;
-    }
-
-    // TODO Rename this
-    public int getNumBuffersPerHandler() {
-        return numBuffersPerHandler;
-    }
-
-    // TODO Rename this
-    public void setNumBuffersPerHandler(int numBuffersPerHandler) {
-        this.numBuffersPerHandler = numBuffersPerHandler;
-    }
-
     // ----------------------------------------------------------------------------------------------------------------
     // Helper methods
     // ----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/resources/default-omid.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid.yml b/tso-server/src/main/resources/default-omid.yml
index e3e559d..552a77a 100644
--- a/tso-server/src/main/resources/default-omid.yml
+++ b/tso-server/src/main/resources/default-omid.yml
@@ -5,7 +5,6 @@
 # ---------------------------------------------------------------------------------------------------------------------
 # Basic configuration parameters
 # ---------------------------------------------------------------------------------------------------------------------
-# TODO Explain rest of the parameters and/or rename properly
 
 # Network interface for TSO server communication. Uncomment the following line to use a specific interface
 # networkIfaceName: eth0
@@ -14,12 +13,16 @@
 
 # Port reserved by the Status Oracle
 port: 54758
+# The number of elements reserved in the conflict map to perform conflict resolution
+# TODO Rename to conflictMapSize
 maxItems: 100000000
-maxBatchSize: 25
+# The number of Commit Table writers that persist data concurrently to the datastore. It has to be at least 2.
+numConcurrentCTWriters: 2
+# The size of the batch of operations that each Commit Table writes has. The maximum number of operations that can be
+# batched in the system at a certain point in time is: numConcurrentCTWriters * batchSizePerCTWriter
+batchSizePerCTWriter: 25
+# When this timeout expires, the contents of the batch are flushed to the datastore
 batchPersistTimeoutInMs: 10
-# TODO rename the next two parameters properly (e.g. numOfConcurrentWriters...)
-persistHandlerNum: 1
-numBuffersPerHandler: 10
 
 # Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
 timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
@@ -64,7 +67,6 @@ metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
 #     }
 # ]
 
-
 # Example of multiple reporter configuration (to CSV files and console)
 #
 # metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index c10df9d..92582f6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
@@ -40,6 +41,7 @@ public class TSOMockModule extends AbstractModule {
     private final TSOServerConfig config;
 
     public TSOMockModule(TSOServerConfig config) {
+        Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
         this.config = config;
     }
 
@@ -78,7 +80,7 @@ public class TSOMockModule extends AbstractModule {
 
     @Provides
     PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
-        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
             persistenceProcessorHandlers[i] = provider.get();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 08b38be..c003f34 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -55,21 +55,13 @@ public class TestBatch {
     @BeforeMethod(alwaysRun = true, timeOut = 30_000)
     public void initMocksAndComponents() {
         MockitoAnnotations.initMocks(this);
-        batch = new Batch(BATCH_SIZE);
+        batch = new Batch(BATCH_SIZE, 0, null);
     }
 
-    // TODO. Check this test with the contents of the master branch. See commented asserts below
+    // TODO. Refactor the batch and test it properly. See commented asserts below
     @Test
     public void testBatchFunctionality() {
 
-        // Required mocks
-        Channel channel = Mockito.mock(Channel.class);
-        ReplyProcessor replyProcessor = Mockito.mock(ReplyProcessor.class);
-        RetryProcessor retryProcessor = Mockito.mock(RetryProcessor.class);
-
-        // The batch element to test
-        Batch batch = new Batch(BATCH_SIZE);
-
         // Test initial state is OK
         assertFalse(batch.isFull(), "Batch shouldn't be full");
         assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
@@ -119,7 +111,7 @@ public class TestBatch {
 //        assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
 //=======
         batch.sendReply(replyProcessor, retryProcessor, (-1));
-        verify(replyProcessor, timeout(100).times(1)).batchResponse(batch, (-1));
+        verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
         assertTrue(batch.isFull(), "Batch shouldn't be empty");
     }
 
@@ -141,12 +133,10 @@ public class TestBatch {
             }
         }
 
-        // Test that sending replies empties the batch also when the replica
-        // is NOT master and calls the ambiguousCommitResponse() method on the
-        // reply processor
+        // Test that sending replies empties the batch also when the replica  is NOT master and calls the
+        // ambiguousCommitResponse() method on the reply processor
         batch.sendReply(replyProcessor, retryProcessor, (-1));
-        verify(replyProcessor, timeout(100).times(1))
-               .batchResponse(batch, (-1));
+        verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
         assertTrue(batch.isFull(), "Batch should be full");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 0343716..e8dd8ad 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -119,8 +119,8 @@ public class TestPanicker {
         TSOServerConfig config = new TSOServerConfig();
         BatchPool batchPool = new BatchPool(config);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
-        for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+        for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics,
                                                           "localhost:1234",
                                                           leaseManager,
@@ -132,11 +132,10 @@ public class TestPanicker {
 
         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
                                                                  batchPool,
-                                                                 mock(ReplyProcessor.class),
                                                                  panicker,
                                                                  handlers);
 
-        proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
 
         new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
 
@@ -170,8 +169,8 @@ public class TestPanicker {
         TSOServerConfig config = new TSOServerConfig();
         BatchPool batchPool = new BatchPool(config);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
-        for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+        for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics,
                                                           "localhost:1234",
                                                           mock(LeaseManager.class),
@@ -183,10 +182,9 @@ public class TestPanicker {
 
         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
                                                                  batchPool,
-                                                                 mock(ReplyProcessor.class),
                                                                  panicker,
                                                                  handlers);
-        proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
 
         new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index bc0b0f0..8e18119 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -44,12 +44,15 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-// TODO Add timers
-// TODO Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
+// TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
 public class TestPersistenceProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
 
+    private static final long ANY_LWM = 0L;
+    private static final int ANY_ST = 0;
+    private static final int ANY_CT = 1;
+
     @Mock
     private CommitTable.Writer mockWriter;
     @Mock
@@ -57,8 +60,6 @@ public class TestPersistenceProcessor {
     @Mock
     private RetryProcessor retryProcessor;
     @Mock
-    private ReplyProcessor replyProcessor;
-    @Mock
     private Panicker panicker;
 
     private MetricsRegistry metrics;
@@ -69,9 +70,6 @@ public class TestPersistenceProcessor {
 
         MockitoAnnotations.initMocks(this);
 
-        // Configure mock writer to flush successfully
-//        doThrow(new IOException("Unable to write")).when(mockWriter).flush();
-
         // Configure null metrics provider
         metrics = new NullMetricsProvider();
 
@@ -96,21 +94,23 @@ public class TestPersistenceProcessor {
     }
 
     @Test
-    public void testCommitPersistenceWithMultiHandlers() throws Exception {
+    public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
 
-        final int MAX_BATCH_SIZE = 4;
-        final int NUM_PERSIST_HANDLERS = 4;
+        final int NUM_CT_WRITERS = 1;
+        final int BATCH_SIZE_PER_CT_WRITER = 2;
 
         // Init a non-HA lease manager
         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
                                                                  mock(TSOStateManager.class)));
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
-        tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
-        tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
+        tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
+        tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
-        for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
                                                           leaseManager,
                                                           commitTable,
@@ -122,46 +122,39 @@ public class TestPersistenceProcessor {
         // Component under test
         BatchPool batchPool = spy(new BatchPool(tsoConfig));
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
-                                                                     batchPool,
-                                                                     replyProcessor,
-                                                                     panicker,
-                                                                     handlers);
-
-        verify(batchPool, times(1)).getNextEmptyBatch();
-
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
 
-        proc.persistCommit(3, 4, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
+        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
 
-        proc.persistCommit(5, 6, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
+        verify(batchPool, times(1)).getNextEmptyBatch(); // Called during initialization
 
-        proc.persistCommit(7, 8, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
 
-        verify(batchPool, times(5)).getNextEmptyBatch(); // 5 Times: 1 in initialization + 4 when flushing above
+        verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).getNextEmptyBatch(); // 3: 1 in init + 2 when flushing
 
     }
 
-    @Test
-    public void testCommitPersistenceWithSingleHandlerInMultiHandlersEnvironment() throws Exception {
+    @Test(timeOut=10_000)
+    public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
 
-        final int MAX_BATCH_SIZE = 16;
-        final int NUM_PERSIST_HANDLERS = 4;
+        final int NUM_CT_WRITERS = 2;
+        final int BATCH_SIZE_PER_CT_WRITER = 2;
 
         // Init a non-HA lease manager
         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
                                                                  mock(TSOStateManager.class)));
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
-        tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
-        tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
+        tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
+        tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
+
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
-        for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics,
                                                           "localhost:1234",
                                                           leaseManager,
@@ -173,66 +166,66 @@ public class TestPersistenceProcessor {
 
         // Component under test
         BatchPool batchPool = spy(new BatchPool(tsoConfig));
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
-                                                                     batchPool,
-                                                                     replyProcessor,
-                                                                     panicker,
-                                                                     handlers);
-
-        verify(batchPool, times(1)).getNextEmptyBatch();
-
-        // Fill one Batch completely
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistCommit(3, 4, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistCommit(5, 6, mock(Channel.class), mock(MonitoringContext.class));
-        verify(batchPool, times(1)).getNextEmptyBatch();
-        proc.persistCommit(7, 8, mock(Channel.class), mock(MonitoringContext.class)); // Should be full here
-        verify(batchPool, times(2)).getNextEmptyBatch();
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
 
-        // Test empty flush does not trigger response in getting a new batch
-        proc.persistFlush();
-        verify(batchPool, times(2)).getNextEmptyBatch();
+        proc.addLowWatermarkToBatch(ANY_LWM, mock(MonitoringContext.class)); // Add a fake LWM
+
+        verify(batchPool, times(1)).getNextEmptyBatch(); // Called during initialization
 
-        // Fill a second Batch completely
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistCommit(3, 4, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistCommit(5, 6, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistCommit(7, 8, mock(Channel.class), mock(MonitoringContext.class)); // Should be full here
+        // Fill 1st handler Batches completely
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+        verify(batchPool, times(2)).getNextEmptyBatch();
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
         verify(batchPool, times(3)).getNextEmptyBatch();
 
-        // Start filling a new batch and flush it immediately
-        proc.persistCommit(9, 10, mock(Channel.class), mock(MonitoringContext.class));
+        // Test empty flush does not trigger response in getting a new currentBatch
+        proc.triggerCurrentBatchFlush();
         verify(batchPool, times(3)).getNextEmptyBatch();
-        proc.persistFlush();
-        verify(batchPool, times(4)).getNextEmptyBatch();
+
+        // Fill 2nd handler Batches completely
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+        verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).getNextEmptyBatch();
+
+        // Start filling a new currentBatch and flush it immediately
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
+        verify(batchPool, times(5)).getNextEmptyBatch();
+        proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
+        verify(batchPool, times(6)).getNextEmptyBatch();
 
         // Test empty flush does not trigger response
-        proc.persistFlush();
-        proc.persistFlush();
-        proc.persistFlush();
-        proc.persistFlush();
-        proc.persistFlush();
-        verify(batchPool, times(4)).getNextEmptyBatch();
+        proc.triggerCurrentBatchFlush();
+        proc.triggerCurrentBatchFlush();
+        proc.triggerCurrentBatchFlush();
+        proc.triggerCurrentBatchFlush();
+        proc.triggerCurrentBatchFlush();
+        verify(batchPool, times(6)).getNextEmptyBatch();
 
     }
 
-    @Test
+    @Test(timeOut=10_000)
     public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
 
-        final int MAX_BATCH_SIZE = 4;
-        final int NUM_PERSIST_HANDLERS = 4;
+        final int NUM_CT_WRITERS = 1;
+        final int BATCH_SIZE_PER_CT_WRITER = 1;
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
-        tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
-        tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
+        tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
+        tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
         tsoConfig.setBatchPersistTimeoutInMs(100);
 
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
         // Init a non-HA lease manager
         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
                 mock(TSOStateManager.class)));
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
-        for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics,
                                                           "localhost:1234",
                                                           leaseManager,
@@ -242,111 +235,160 @@ public class TestPersistenceProcessor {
                                                           panicker);
         }
 
-        // Component under test
         BatchPool batchPool = spy(new BatchPool(tsoConfig));
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
-                                                                     batchPool,
-                                                                     replyProcessor,
-                                                                     panicker,
-                                                                     handlers);
+
+        // Component under test
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
 
         // The non-ha lease manager always return true for
-        // stillInLeasePeriod(), so verify the batch sends replies as master
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
+        // stillInLeasePeriod(), so verify the currentBatch sends replies as master
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.triggerCurrentBatchFlush();
         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).getNextEmptyBatch();
 
     }
 
-    @Test
-    public void testCommitPersistenceWithHALeaseManagerMultiHandlers() throws Exception {
-        final int MAX_BATCH_SIZE = 4;
-        final int NUM_PERSIST_HANDLERS = 4;
+    @Test(timeOut=10_000)
+    public void testCommitPersistenceWithHALeaseManagerAndSingleCommitTableWriter() throws Exception {
+
+        final int NUM_PERSIST_HANDLERS = 1;
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
-        tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
-        tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
-        tsoConfig.setBatchPersistTimeoutInMs(100);
+        tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
+
+        testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
+        testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
+        testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
+        testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
 
-        testCommitPersistenceWithHALeaseManagerPerConfig(tsoConfig);
     }
 
-    @Test
-    public void testCommitPersistenceWithHALeaseManager() throws Exception {
+    @Test(timeOut=10_000)
+    public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
+
+        final int NUM_CT_WRITERS = 4;
+        final int BATCH_SIZE_PER_CT_WRITER = 4;
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
-        testCommitPersistenceWithHALeaseManagerPerConfig(tsoConfig);
+        tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
+        tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
+        tsoConfig.setBatchPersistTimeoutInMs(100);
+
+        testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
+        testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
+        testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
+        testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
 
     }
 
-    // TODO Recheck this tests comparing with previous master
-    private void testCommitPersistenceWithHALeaseManagerPerConfig (TSOServerConfig tsoConfig) throws Exception {
+    private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
 
         // Init a HA lease manager
-        LeaseManager leaseManager = mock(LeaseManager.class);
+        LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
-        for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
-            handlers[i] = new PersistenceProcessorHandler(metrics,
-                                                          "localhost:1234",
-                                                          leaseManager,
-                                                          commitTable,
-                                                          replyProcessor,
-                                                          retryProcessor,
-                                                          new RuntimeExceptionPanicker());
-        }
+        PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
+
+        BatchPool batchPool = spy(new BatchPool(tsoConfig));
+
+        // Component under test
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
+
+        // Test: Configure the lease manager to return true always
+        doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.triggerCurrentBatchFlush();
+        verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+        verify(batchPool, times(2)).getNextEmptyBatch();
+    }
+
+    private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
+
+        // Init a HA lease manager
+        LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
+
+        PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
+
+        BatchPool batchPool = spy(new BatchPool(tsoConfig));
 
         // Component under test
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+        // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
+        doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
+        batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.triggerCurrentBatchFlush();
+        verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+        verify(batchPool, times(2)).getNextEmptyBatch();
+    }
+
+    private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
+
+        // Init a HA lease manager
+        LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
+
+        PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
+
         BatchPool batchPool = spy(new BatchPool(tsoConfig));
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
-                                                                     batchPool,
-                                                                     replyProcessor,
-                                                                     panicker,
-                                                                     handlers);
 
-        doReturn(true).when(leaseManager).stillInLeasePeriod();
-        MonitoringContext monCtx = new MonitoringContext(metrics);
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
-        verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+        // Component under test
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+        // Test: Configure the lease manager to return false for stillInLeasePeriod
+        doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
+        batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.triggerCurrentBatchFlush();
+        verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).getNextEmptyBatch();
+    }
 
-        // Test 2: Configure the lease manager to return true first and false later for stillInLeasePeriod
+    private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
 
-        // Reset stuff
-        reset(leaseManager);
-        doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
-        verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
-        verify(batchPool, times(3)).getNextEmptyBatch();
+        // Init a HA lease manager
+        LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
 
-        // Test 3: Configure the lease manager to return false for stillInLeasePeriod
+        PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
 
-        // Reset stuff
-        reset(leaseManager);
-        doReturn(false).when(leaseManager).stillInLeasePeriod();
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
-        verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
-        verify(batchPool, times(4)).getNextEmptyBatch();
+        BatchPool batchPool = spy(new BatchPool(tsoConfig));
 
-        // Test 4: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
-        // an exception when flush
+        // Component under test
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
 
-        // Reset stuff
-        reset(leaseManager);
-        // Configure mock writer to flush successfully
+        // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
+        // an exception when flush
+        // Configure mock writer to flush unsuccessfully
         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
-        doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
-        proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
-        proc.persistFlush();
-        verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
-        verify(batchPool, times(5)).getNextEmptyBatch();
+        doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
+        batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.triggerCurrentBatchFlush();
+        verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
+        verify(batchPool, times(2)).getNextEmptyBatch();
+
     }
 
-    @Test
+    private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig, LeaseManager leaseManager)
+            throws Exception {
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
+            handlers[i] = new PersistenceProcessorHandler(metrics,
+                                                          "localhost:1234",
+                                                          leaseManager,
+                                                          commitTable,
+                                                          new ReplyProcessorImpl(metrics, panicker),
+                                                          retryProcessor,
+                                                          new RuntimeExceptionPanicker());
+        }
+        return handlers;
+    }
+
+    @Test(timeOut=10_000)
     public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
 
         // Init lease management (doesn't matter if HA or not)
@@ -354,22 +396,20 @@ public class TestPersistenceProcessor {
         TSOServerConfig config = new TSOServerConfig();
         BatchPool batchPool = new BatchPool(config);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
-        for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+        for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics,
                                                           "localhost:1234",
                                                           leaseManager,
                                                           commitTable,
-                                                          mock(ReplyProcessor.class),
+                                                          replyProcessor,
                                                           mock(RetryProcessor.class),
                                                           panicker);
         }
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config,
-                                                                 batchPool,
-                                                                 mock(ReplyProcessor.class),
-                                                                 panicker,
-                                                                 handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
 
         MonitoringContext monCtx = new MonitoringContext(metrics);
 
@@ -379,20 +419,25 @@ public class TestPersistenceProcessor {
         // Configure commit table writer to explode when flushing changes to DB
         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
 
+        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
+
         // Check the panic is extended!
-        proc.persistCommit(1, 2, null, monCtx);
-        proc.persistFlush();
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+        proc.triggerCurrentBatchFlush();
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+
     }
 
-    @Test
+    @Test(timeOut=10_000)
     public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
 
         TSOServerConfig config = new TSOServerConfig();
         BatchPool batchPool = new BatchPool(config);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
-        for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+        for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
             handlers[i] = new PersistenceProcessorHandler(metrics,
                                                           "localhost:1234",
                                                           mock(LeaseManager.class),
@@ -402,19 +447,18 @@ public class TestPersistenceProcessor {
                                                           panicker);
         }
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config,
-                                                                 batchPool,
-                                                                 replyProcessor,
-                                                                 panicker,
-                                                                 handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
 
         // Configure writer to explode with a runtime exception
         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
         MonitoringContext monCtx = new MonitoringContext(metrics);
 
+        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
+
+        batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
         // Check the panic is extended!
-        proc.persistCommit(1, 2, null, monCtx);
-        proc.persistFlush();
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+        proc.triggerCurrentBatchFlush();
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 2c612a4..11f1a99 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -84,14 +84,14 @@ public class TestRequestProcessor {
 
         requestProc.timestampRequest(null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(1)).persistTimestamp(
+        verify(persist, timeout(100).times(1)).addTimestampToBatch(
                 firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
 
         long firstTS = firstTScapture.getValue();
         // verify that timestamps increase monotonically
         for (int i = 0; i < 100; i++) {
             requestProc.timestampRequest(null, new MonitoringContext(metrics));
-            verify(persist, timeout(100).times(1)).persistTimestamp(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
+            verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
         }
 
     }
@@ -101,37 +101,37 @@ public class TestRequestProcessor {
 
         requestProc.timestampRequest(null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(1)).persistTimestamp(
+        verify(persist, timeout(100).times(1)).addTimestampToBatch(
                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
         long firstTS = TScapture.getValue();
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
         requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).persistAbort(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
 
         requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(persist, timeout(100).times(1)).persistCommit(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
 
         // test conflict
         requestProc.timestampRequest(null, new MonitoringContext(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(2)).persistTimestamp(
+        verify(persist, timeout(100).times(2)).addTimestampToBatch(
                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
         long secondTS = TScapture.getValue();
 
         requestProc.timestampRequest(null, new MonitoringContext(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(3)).persistTimestamp(
+        verify(persist, timeout(100).times(3)).addTimestampToBatch(
                 TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
         long thirdTS = TScapture.getValue();
 
         requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).persistCommit(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
         requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).persistAbort(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
 
     }
 
@@ -143,9 +143,9 @@ public class TestRequestProcessor {
         // Start a transaction...
         requestProc.timestampRequest(null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(1)).persistTimestamp(capturedTS.capture(),
-                                                                any(Channel.class),
-                                                                any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
+                                                                   any(Channel.class),
+                                                                   any(MonitoringContext.class));
         long startTS = capturedTS.getValue();
 
         // ... simulate the reset of the RequestProcessor state (e.g. due to
@@ -154,7 +154,7 @@ public class TestRequestProcessor {
 
         // ...check that the transaction is aborted when trying to commit
         requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).persistAbort(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
 
     }
 
@@ -175,11 +175,11 @@ public class TestRequestProcessor {
         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
 
         // Check that first time its called is on init
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(0L), any(MonitoringContext.class));
         // Then, check it is called when cache is full and the first element is evicted (should be a 1)
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(FIRST_COMMIT_TS_EVICTED), any(MonitoringContext.class));
         // Finally it should never be called with the next element
-        verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED), any(MonitoringContext.class));
+        verify(persist, timeout(100).never()).addLowWatermarkToBatch(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED), any(MonitoringContext.class));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index c5270c0..afa191e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -66,10 +66,7 @@ public class TestRetryProcessor {
     }
 
     @Test(timeOut = 10_000)
-    public void testBasicFunctionality() throws Exception {
-
-        commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
-
+    public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
         BatchPool batchPool = new BatchPool(new TSOServerConfig());
 
         // The element to test
@@ -77,22 +74,35 @@ public class TestRetryProcessor {
 
         // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
         retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
-        ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
 
         verify(replyProc, timeout(100).times(1))
-                .addAbort(any(Batch.class), firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
-        long startTS = firstTScapture.getValue();
+                .addAbort(any(Batch.class), firstTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        long startTS = firstTSCapture.getValue();
         assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
+    }
+
+    @Test(timeOut = 10_000)
+    public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
+        BatchPool batchPool = new BatchPool(new TSOServerConfig());
+
+        // The element to test
+        RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
+        commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
-        ArgumentCaptor<Long> secondTScapture = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
+        ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(replyProc, timeout(100).times(1))
-                .addCommit(any(Batch.class), firstTScapture.capture(), secondTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProc, timeout(100).times(1)).addCommit(any(Batch.class),
+                                                           firstTSCapture.capture(),
+                                                           secondTSCapture.capture(),
+                                                           any(Channel.class),
+                                                           any(MonitoringContext.class));
 
-        startTS = firstTScapture.getValue();
-        long commitTS = secondTScapture.getValue();
+        long startTS = firstTSCapture.getValue();
+        long commitTS = secondTSCapture.getValue();
         assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
         assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
     }
@@ -117,9 +127,10 @@ public class TestRetryProcessor {
         // Test we'll reply with an abort for a retry request when the
         // transaction id IS in the commit table BUT invalidated
         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
-        ArgumentCaptor<Long> startTScapture = ArgumentCaptor.forClass(Long.class);
-        verify(replyProc, timeout(100).times(1)).addAbort(any(Batch.class), startTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
-        long startTS = startTScapture.getValue();
+        ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
+        verify(replyProc, timeout(100).times(1))
+                .addAbort(any(Batch.class), startTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        long startTS = startTSCapture.getValue();
         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
 
     }