You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:36 UTC

[29/50] [abbrv] incubator-omid git commit: Improve TSO performance: Parallel batched writes to CT

Improve TSO performance: Parallel batched writes to CT

The architecture mostly changed by adding batch events and creating
handlers under PersistenceProcessorImpl.

Old comments of squashed commits:

Scaling up the TSO by parallelizing the batched writes to the commit table. The architecture mostly changed by adding batch events and creating handlers under PersistenceProcessorImpl.
This commit increases the performance of Omid from 55K tps to 380K tps when using 6 region servers.
Please note that the addition of MonitoringContext significantly degrades performance and therefore, should becomes optional to achieve good performance.

Scaling up the TSO by parallelizing the batched writes to the commit table. The architecture mostly changed by adding batch events and creating handlers under PersistenceProcessorImpl.
This commit increases the performance of Omid from 55K tps to 380K tps when using 6 region servers.
Please note that the addition of MonitoringContext significantly degrades performance and therefore, should becomes optional to achieve good performance.

Integrating Igor's comments.

Integrating Igor's and Francisco's comments.

Scaling up the TSO by parallelizing the batched writes to the commit table. The architecture mostly changed by adding batch events and creating handlers under PersistenceProcessorImpl.
This commit increases the performance of Omid from 55K tps to 380K tps when using 6 region servers.
Please note that the addition of MonitoringContext significantly degrades performance and therefore, should becomes optional to achieve good performance.

Integrating Igor's comments.

Integrating Igor's and Francisco's comments.

Fixing a bug in the Batch handling for future reuse.

Removing busy-wait startegy from the batch allocation at the BatchPool.

Wrap wait operation with a loop to overcome spurious wakeups.

Rebase, resolve rebase errors, re-update license

Change-Id: I30727efae4fc8c0d7be328b4cd98d615f5b0fb8a

Move com.yahoo -> org.apache

Change-Id: Ieb02b4f2e0e74464ad007da8b9d0892622e1bfa6

Fix code formatting and add TODOs

No functional changes done

Change-Id: Ia32541da5bd482723375399a49bdb236db56dcf6

Asserts, style, unit tests, fixed most warning from findBugs

Change-Id: I22122894b71a69c8fc7396e7e9166aca1a489566

remove commneted out code,  leftovers of merges  '<<<<<<< '

Change-Id: If108c1a344447cef0533816a1b39a73a73657375


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/f6220a84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/f6220a84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/f6220a84

Branch: refs/heads/master
Commit: f6220a846210b202a4b41459c612434bc43297ac
Parents: 41d2e23
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Wed Mar 23 10:38:52 2016 +0200
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Fri Apr 29 09:38:48 2016 -0700

----------------------------------------------------------------------
 .../apache/omid/benchmarks/utils/Generator.java |   2 +-
 .../utils/ScrambledZipfianGenerator.java        |   2 +-
 .../org/apache/omid/transaction/TTable.java     |  38 +-
 .../apache/omid/transaction/TestTSOModule.java  |  13 +
 .../TSOForHBaseCompactorTestModule.java         |  12 +
 .../omid/tools/hbase/OmidTableManager.java      |  12 +-
 .../main/java/org/apache/omid/tso/Batch.java    | 146 +++++++
 .../java/org/apache/omid/tso/BatchPool.java     |  85 ++++
 .../apache/omid/tso/FatalExceptionHandler.java  |   4 +-
 .../org/apache/omid/tso/LeaseManagement.java    |   3 +-
 .../java/org/apache/omid/tso/PersistEvent.java  | 117 ++++++
 .../apache/omid/tso/PersistenceProcessor.java   |  22 +-
 .../omid/tso/PersistenceProcessorHandler.java   | 135 ++++++
 .../omid/tso/PersistenceProcessorImpl.java      | 417 ++++---------------
 .../org/apache/omid/tso/ReplyProcessor.java     |  17 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java | 273 +++++++-----
 .../apache/omid/tso/RequestProcessorImpl.java   |  79 +++-
 .../org/apache/omid/tso/RetryProcessorImpl.java |  53 +--
 .../java/org/apache/omid/tso/TSOModule.java     |  17 +-
 .../org/apache/omid/tso/TSOServerConfig.java    |  26 ++
 .../org/apache/omid/tso/TSOStateManager.java    |   6 +-
 .../apache/omid/tso/TSOStateManagerImpl.java    |   4 +-
 .../org/apache/omid/tso/VoidLeaseManager.java   |   2 +-
 tso-server/src/main/resources/default-omid.yml  |   3 +
 .../java/org/apache/omid/tso/TSOMockModule.java |  11 +
 .../java/org/apache/omid/tso/TestBatch.java     |  67 ++-
 .../java/org/apache/omid/tso/TestPanicker.java  |  64 ++-
 .../omid/tso/TestPersistenceProcessor.java      | 340 +++++++++------
 .../apache/omid/tso/TestRequestProcessor.java   |  10 +-
 .../org/apache/omid/tso/TestRetryProcessor.java |  23 +-
 30 files changed, 1303 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/Generator.java
----------------------------------------------------------------------
diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/Generator.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/Generator.java
index 159d5f9..ced6c82 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/Generator.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/Generator.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.                                                                                                                             
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/ScrambledZipfianGenerator.java
----------------------------------------------------------------------
diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/ScrambledZipfianGenerator.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/ScrambledZipfianGenerator.java
index 1a5c429..e5e484a 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/ScrambledZipfianGenerator.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/utils/ScrambledZipfianGenerator.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.                                                                                                                             
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 285facb..2c673ff 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -23,8 +23,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimaps;
-import org.apache.omid.committable.CommitTable.CommitTimestamp;
-import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -44,6 +42,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -189,10 +189,10 @@ public class TTable implements Closeable {
                                     CellUtils.DELETE_TOMBSTONE);
                         transaction.addWriteSetElement(
                                 new HBaseCellId(table,
-                                        delete.getRow(),
-                                        CellUtil.cloneFamily(cell),
-                                        CellUtil.cloneQualifier(cell),
-                                        cell.getTimestamp()));
+                                                delete.getRow(),
+                                                CellUtil.cloneFamily(cell),
+                                                CellUtil.cloneQualifier(cell),
+                                                cell.getTimestamp()));
                         break;
                     case DeleteFamily:
                         deleteG.addFamily(CellUtil.cloneFamily(cell));
@@ -206,10 +206,10 @@ public class TTable implements Closeable {
                                         CellUtils.DELETE_TOMBSTONE);
                             transaction.addWriteSetElement(
                                     new HBaseCellId(table,
-                                            delete.getRow(),
-                                            CellUtil.cloneFamily(cell),
-                                            CellUtil.cloneQualifier(cell),
-                                            cell.getTimestamp()));
+                                                    delete.getRow(),
+                                                    CellUtil.cloneFamily(cell),
+                                                    CellUtil.cloneQualifier(cell),
+                                                    cell.getTimestamp()));
                             break;
                         } else {
                             throw new UnsupportedOperationException(
@@ -268,10 +268,10 @@ public class TTable implements Closeable {
 
                 transaction.addWriteSetElement(
                         new HBaseCellId(table,
-                                CellUtil.cloneRow(kv),
-                                CellUtil.cloneFamily(kv),
-                                CellUtil.cloneQualifier(kv),
-                                kv.getTimestamp()));
+                                        CellUtil.cloneRow(kv),
+                                        CellUtil.cloneFamily(kv),
+                                        CellUtil.cloneQualifier(kv),
+                                        kv.getTimestamp()));
             }
         }
 
@@ -423,10 +423,10 @@ public class TTable implements Closeable {
                         epoch,
                         new CommitTimestampLocatorImpl(
                                 new HBaseCellId(table,
-                                        CellUtil.cloneRow(cell),
-                                        CellUtil.cloneFamily(cell),
-                                        CellUtil.cloneQualifier(cell),
-                                        cell.getTimestamp()),
+                                                CellUtil.cloneRow(cell),
+                                                CellUtil.cloneFamily(cell),
+                                                CellUtil.cloneQualifier(cell),
+                                                cell.getTimestamp()),
                                 commitCache));
 
         // If transaction that added the cell was invalidated
@@ -740,7 +740,7 @@ public class TTable implements Closeable {
         } else {
             throw new IllegalArgumentException(
                     String.format("The transaction object passed %s is not an instance of HBaseTransaction",
-                            tx.getClass().getName()));
+                                  tx.getClass().getName()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 2d4479a..a8589bf 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -18,6 +18,7 @@
 package org.apache.omid.transaction;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
 import com.google.inject.Provides;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTable;
@@ -30,15 +31,18 @@ import org.apache.omid.tso.RuntimeExceptionPanicker;
 import org.apache.omid.tso.NetworkInterfaceUtils;
 import org.apache.omid.tso.Panicker;
 import org.apache.omid.tso.PausableTimestampOracle;
+import org.apache.omid.tso.PersistenceProcessorHandler;
 import org.apache.omid.tso.TSOChannelHandler;
 import org.apache.omid.tso.TSOServerConfig;
 import org.apache.omid.tso.TSOStateManager;
 import org.apache.omid.tso.TSOStateManagerImpl;
 import org.apache.omid.tso.TimestampOracle;
+
 import org.apache.hadoop.conf.Configuration;
 
 import javax.inject.Named;
 import javax.inject.Singleton;
+
 import java.net.SocketException;
 import java.net.UnknownHostException;
 
@@ -95,4 +99,13 @@ class TestTSOModule extends AbstractModule {
         return NetworkInterfaceUtils.getTSOHostAndPort(config);
     }
 
+    @Provides
+    PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
+        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+        for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
+            persistenceProcessorHandlers[i] = provider.get();
+        }
+        return persistenceProcessorHandlers;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index 7a3b550..96afbd2 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -18,6 +18,7 @@
 package org.apache.omid.transaction;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
 import com.google.inject.Provides;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTable;
@@ -30,6 +31,7 @@ import org.apache.omid.tso.LeaseManagement;
 import org.apache.omid.tso.MockPanicker;
 import org.apache.omid.tso.NetworkInterfaceUtils;
 import org.apache.omid.tso.Panicker;
+import org.apache.omid.tso.PersistenceProcessorHandler;
 import org.apache.omid.tso.TSOChannelHandler;
 import org.apache.omid.tso.TSOServerConfig;
 import org.apache.omid.tso.TSOStateManager;
@@ -37,12 +39,14 @@ import org.apache.omid.tso.TSOStateManagerImpl;
 import org.apache.omid.tso.TimestampOracle;
 import org.apache.omid.tso.TimestampOracleImpl;
 import org.apache.omid.tso.VoidLeaseManager;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 
 import javax.inject.Named;
 import javax.inject.Singleton;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.SocketException;
@@ -115,6 +119,14 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
     @Named(TSO_HOST_AND_PORT_KEY)
     String provideTSOHostAndPort() throws SocketException, UnknownHostException {
         return NetworkInterfaceUtils.getTSOHostAndPort(config);
+    }
 
+    @Provides
+    PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
+        PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+        for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
+            persistenceProcessorHandlers[i] = provider.get();
+        }
+        return persistenceProcessorHandlers;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/hbase-tools/src/main/java/org/apache/omid/tools/hbase/OmidTableManager.java
----------------------------------------------------------------------
diff --git a/hbase-tools/src/main/java/org/apache/omid/tools/hbase/OmidTableManager.java b/hbase-tools/src/main/java/org/apache/omid/tools/hbase/OmidTableManager.java
index 735ad1e..126cb99 100644
--- a/hbase-tools/src/main/java/org/apache/omid/tools/hbase/OmidTableManager.java
+++ b/hbase-tools/src/main/java/org/apache/omid/tools/hbase/OmidTableManager.java
@@ -23,12 +23,6 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.ParametersDelegate;
-import org.apache.omid.HBaseShims;
-import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
-import org.apache.omid.committable.hbase.KeyGenerator;
-import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
-import org.apache.omid.committable.hbase.RegionSplitter;
-import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -36,6 +30,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.HBaseShims;
+import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
+import org.apache.omid.committable.hbase.KeyGenerator;
+import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
+import org.apache.omid.committable.hbase.RegionSplitter;
+import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
new file mode 100644
index 0000000..8f665e1
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Preconditions;
+import org.apache.omid.tso.PersistEvent.Type;
+import org.jboss.netty.channel.Channel;
+
+public class Batch {
+
+    private final PersistEvent[] events;
+    private final int maxBatchSize;
+    private final BatchPool batchPool;
+    private final int id;
+    private int numEvents;
+
+    Batch(int maxBatchSize) {
+
+        this(maxBatchSize, 0, null);
+
+    }
+
+    Batch(int size, int id, BatchPool batchPool) {
+        Preconditions.checkArgument(size > 0, "Size must be positive");
+        this.maxBatchSize = size;
+        this.batchPool = batchPool;
+        this.id = id;
+        this.numEvents = 0;
+        this.events = new PersistEvent[size];
+        for (int i = 0; i < size; i++) {
+            this.events[i] = new PersistEvent();
+        }
+
+    }
+
+    boolean isFull() {
+        Preconditions.checkState(numEvents <= maxBatchSize, "numEvents > maxBatchSize");
+        return numEvents == maxBatchSize;
+
+    }
+
+    boolean isEmpty() {
+
+        return numEvents == 0;
+
+    }
+
+    boolean isLastEntryEmpty() {
+        Preconditions.checkState(numEvents <= maxBatchSize, "numEvents > maxBatchSize");
+        return numEvents == (maxBatchSize - 1);
+
+    }
+
+    int getNumEvents() {
+        return numEvents;
+    }
+
+    PersistEvent getEvent(int i) {
+
+        assert (0 <= i && i < numEvents);
+        return events[i];
+
+    }
+
+    void clear() {
+
+        numEvents = 0;
+        if (batchPool != null) {
+            batchPool.notifyEmptyBatch(id);
+        }
+
+    }
+
+    void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
+        Preconditions.checkState(!isFull(), "batch is full");
+        int index = numEvents++;
+        PersistEvent e = events[index];
+        e.makePersistCommit(startTimestamp, commitTimestamp, c, context);
+
+    }
+
+    void addAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context) {
+        Preconditions.checkState(!isFull(), "batch is full");
+        int index = numEvents++;
+        PersistEvent e = events[index];
+        e.makePersistAbort(startTimestamp, isRetry, c, context);
+
+    }
+
+    void addTimestamp(long startTimestamp, Channel c, MonitoringContext context) {
+        Preconditions.checkState(!isFull(), "batch is full");
+        int index = numEvents++;
+        PersistEvent e = events[index];
+        e.makePersistTimestamp(startTimestamp, c, context);
+
+    }
+
+    void addLowWatermark(long lowWatermark, MonitoringContext context) {
+        Preconditions.checkState(!isFull(), "batch is full");
+        int index = numEvents++;
+        PersistEvent e = events[index];
+        e.makePersistLowWatermark(lowWatermark, context);
+
+    }
+
+    void sendReply(ReplyProcessor reply, RetryProcessor retryProc, long batchID, boolean isTSOInstanceMaster) {
+        int i = 0;
+        while (i < numEvents) {
+            PersistEvent e = events[i];
+            if (e.getType() == Type.ABORT && e.isRetry()) {
+                retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
+                PersistEvent tmp = events[i];
+                //TODO: why assign it?
+                events[i] = events[numEvents - 1];
+                events[numEvents - 1] = tmp;
+                if (numEvents == 1) {
+                    clear();
+                    reply.batchResponse(null, batchID, !isTSOInstanceMaster);
+                    return;
+                }
+                numEvents--;
+                continue;
+            }
+            i++;
+        }
+
+        reply.batchResponse(this, batchID, !isTSOInstanceMaster);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java b/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
new file mode 100644
index 0000000..96bed25
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import javax.inject.Inject;
+import java.util.Stack;
+
+class BatchPool {
+
+    final private Batch[] batches;
+    final private int poolSize;
+    final private Stack<Integer> availableBatches;
+
+    @Inject
+    public BatchPool(TSOServerConfig config) {
+
+        int numBuffersPerHandler = (config.getNumBuffersPerHandler() >= 2) ? config.getNumBuffersPerHandler() : 2;
+        poolSize = config.getPersistHandlerNum() * numBuffersPerHandler;
+        batches = new Batch[poolSize];
+        int batchSize = (config.getMaxBatchSize() / config.getPersistHandlerNum() > 0) ? (config.getMaxBatchSize() / config.getPersistHandlerNum()) : 2;
+
+        for (int i = 0; i < poolSize; i++) {
+            batches[i] = new Batch(batchSize, i, this);
+        }
+
+        availableBatches = new Stack<>();
+
+        for (int i = (poolSize - 1); i >= 0; i--) {
+            availableBatches.push(i);
+        }
+
+    }
+
+    Batch getNextEmptyBatch() throws InterruptedException {
+
+        synchronized (availableBatches) {
+            while (availableBatches.isEmpty()) {
+                availableBatches.wait();
+            }
+
+            Integer batchIdx = availableBatches.pop();
+            return batches[batchIdx];
+        }
+
+    }
+
+    void notifyEmptyBatch(int batchIdx) {
+
+        synchronized (availableBatches) {
+            availableBatches.push(batchIdx);
+            availableBatches.notify();
+        }
+
+    }
+
+    public void reset() {
+
+        for (int i = 0; i < poolSize; i++) {
+            batches[i].clear();
+        }
+
+        availableBatches.clear();
+
+        for (int i = (poolSize - 1); i >= 0; i--) {
+            availableBatches.push(i);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/FatalExceptionHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/FatalExceptionHandler.java b/tso-server/src/main/java/org/apache/omid/tso/FatalExceptionHandler.java
index 44c2156..2a9f77b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/FatalExceptionHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/FatalExceptionHandler.java
@@ -21,7 +21,8 @@ import com.lmax.disruptor.ExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FatalExceptionHandler implements ExceptionHandler {
+class FatalExceptionHandler implements ExceptionHandler {
+
     private static final Logger LOG = LoggerFactory.getLogger(FatalExceptionHandler.class);
 
     Panicker panicker;
@@ -47,4 +48,5 @@ public class FatalExceptionHandler implements ExceptionHandler {
     public void handleOnStartException(Throwable ex) {
         panicker.panic("Uncaught exception starting up", ex);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/LeaseManagement.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/LeaseManagement.java b/tso-server/src/main/java/org/apache/omid/tso/LeaseManagement.java
index 6908e7f..81d0e0a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/LeaseManagement.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/LeaseManagement.java
@@ -36,8 +36,9 @@ public interface LeaseManagement {
 
     /**
      * Allows to start the service implementing the lease management
+     * @throws InterruptedException
      */
-    void startService() throws LeaseManagementException;
+    void startService() throws LeaseManagementException, InterruptedException;
 
     /**
      * Allows to stop the service implementing the lease management

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
new file mode 100644
index 0000000..e816149
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.jboss.netty.channel.Channel;
+
+public final class PersistEvent {
+
+    private MonitoringContext monCtx;
+
+    enum Type {
+        TIMESTAMP, COMMIT, ABORT, LOW_WATERMARK
+    }
+
+    private Type type = null;
+    private Channel channel = null;
+
+    private boolean isRetry = false;
+    private long startTimestamp = 0L;
+    private long commitTimestamp = 0L;
+    private long lowWatermark = 0L;
+
+    void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+
+        this.type = Type.COMMIT;
+        this.startTimestamp = startTimestamp;
+        this.commitTimestamp = commitTimestamp;
+        this.channel = c;
+        this.monCtx = monCtx;
+
+    }
+
+    void makePersistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
+
+        this.type = Type.ABORT;
+        this.startTimestamp = startTimestamp;
+        this.isRetry = isRetry;
+        this.channel = c;
+        this.monCtx = monCtx;
+
+    }
+
+    void makePersistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
+
+        this.type = Type.TIMESTAMP;
+        this.startTimestamp = startTimestamp;
+        this.channel = c;
+        this.monCtx = monCtx;
+
+    }
+
+    void makePersistLowWatermark(long lowWatermark, MonitoringContext monCtx) {
+
+        this.type = Type.LOW_WATERMARK;
+        this.lowWatermark = lowWatermark;
+        this.monCtx = monCtx;
+
+    }
+
+    MonitoringContext getMonCtx() {
+
+        return monCtx;
+
+    }
+
+    Type getType() {
+
+        return type;
+
+    }
+
+    Channel getChannel() {
+
+        return channel;
+
+    }
+
+    boolean isRetry() {
+
+        return isRetry;
+
+    }
+
+    long getStartTimestamp() {
+
+        return startTimestamp;
+
+    }
+
+    long getCommitTimestamp() {
+
+        return commitTimestamp;
+
+    }
+
+    long getLowWatermark() {
+
+        return lowWatermark;
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index e1b2092..823198b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -19,12 +19,26 @@ package org.apache.omid.tso;
 
 import org.jboss.netty.channel.Channel;
 
+// TODO Check the names of all methods as they do not persist anything anymore
 interface PersistenceProcessor {
-    void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx);
 
-    void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx);
+    // TODO maybe it should be called addCommit(...) or addCommitToBatch(...)
+    void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+            throws InterruptedException;
 
-    void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx);
+    // TODO maybe it should be called addAbort(...) or addAbortToBatch(...)
+    void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx)
+            throws InterruptedException;
+
+    // TODO maybe it should be called addTimestamp(...) or addTimestampToBatch(...)
+    void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws InterruptedException;
+
+    // TODO maybe it should be called addLowWatermark(...) or addLowWatermarkToBatch(...)
+    void persistLowWatermark(long lowWatermark, MonitoringContext monCtx);
+
+    // TODO The name of this method is weird. Rename to "persist"
+    void persistFlush() throws InterruptedException;
+
+    void reset() throws InterruptedException;
 
-    void persistLowWatermark(long lowWatermark);
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
new file mode 100644
index 0000000..36b1959
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.lmax.disruptor.WorkHandler;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.Histogram;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorHandler.class);
+
+    private final String tsoHostAndPort;
+    private final LeaseManagement leaseManager;
+
+    private final ReplyProcessor replyProcessor;
+    private final RetryProcessor retryProc;
+    private final CommitTable.Writer writer;
+    final Panicker panicker;
+
+    private final Timer flushTimer;
+    private final Histogram batchSizeHistogram;
+
+    @Inject
+    PersistenceProcessorHandler(MetricsRegistry metrics,
+                                String tsoHostAndPort,
+                                LeaseManagement leaseManager,
+                                CommitTable commitTable,
+                                ReplyProcessor replyProcessor,
+                                RetryProcessor retryProc,
+                                Panicker panicker)
+    throws InterruptedException, ExecutionException, IOException {
+
+        this.tsoHostAndPort = tsoHostAndPort;
+        this.leaseManager = leaseManager;
+        this.writer = commitTable.getWriter();
+        this.replyProcessor = replyProcessor;
+        this.retryProc = retryProc;
+        this.panicker = panicker;
+
+        flushTimer = metrics.timer(name("tso", "persist", "flush"));
+        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
+
+    }
+
+    @Override
+    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent event) throws Exception {
+
+        Batch batch = event.getBatch();
+        for (int i=0; i < batch.getNumEvents(); ++i) {
+            PersistEvent localEvent = batch.getEvent(i);
+
+            switch (localEvent.getType()) {
+            case COMMIT:
+                localEvent.getMonCtx().timerStart("commitPersistProcessor");
+                // TODO: What happens when the IOException is thrown?
+                writer.addCommittedTransaction(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp());
+                break;
+            case ABORT:
+                break;
+            case TIMESTAMP:
+                localEvent.getMonCtx().timerStart("timestampPersistProcessor");
+                break;
+            case LOW_WATERMARK:
+                writer.updateLowWatermark(localEvent.getLowWatermark());
+                break;
+            default:
+                throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
+            }
+        }
+        flush(batch, event.getBatchID());
+
+    }
+
+    // TODO Fix this method with the contents of PersistenceProcessor.flush() in master branch
+    // TODO This is related to the changes in TestPersistenceProcessor.testCommitPersistenceWithHALeaseManager().
+    // TODO Check also that test in the master branch
+    private void flush(Batch batch, long batchID) throws IOException {
+
+        long startFlushTimeInNs = System.nanoTime();
+
+        boolean areWeStillMaster = true;
+        if (!leaseManager.stillInLeasePeriod()) {
+            // The master TSO replica has changed, so we must inform the
+            // clients about it when sending the replies and avoid flushing
+            // the current batch of TXs
+            areWeStillMaster = false;
+            // We need also to clear the data in the buffer
+            writer.clearWriteBuffer();
+            LOG.trace("Replica {} lost mastership before flushing data", tsoHostAndPort);
+        } else {
+            try {
+                writer.flush();
+            } catch (IOException e) {
+                panicker.panic("Error persisting commit batch", e.getCause());
+            }
+            batchSizeHistogram.update(batch.getNumEvents());
+            if (!leaseManager.stillInLeasePeriod()) {
+                // If after flushing this TSO server is not the master
+                // replica we need inform the client about it
+                areWeStillMaster = false;
+                LOG.warn("Replica {} lost mastership after flushing data", tsoHostAndPort);
+            }
+        }
+        flushTimer.update((System.nanoTime() - startFlushTimeInNs));
+        batch.sendReply(replyProcessor, retryProc, batchID, areWeStillMaster);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index bc1d43c..e82f0b1 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -19,401 +19,154 @@ package org.apache.omid.tso;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
+import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutHandler;
-import org.apache.omid.committable.CommitTable;
-import org.apache.omid.metrics.Histogram;
-import org.apache.omid.metrics.Meter;
-import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.metrics.Timer;
+import com.lmax.disruptor.WorkerPool;
 import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
-import javax.inject.Named;
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.omid.metrics.MetricsUtils.name;
-import static org.apache.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY;
+class PersistenceProcessorImpl implements PersistenceProcessor {
 
-class PersistenceProcessorImpl
-        implements EventHandler<PersistenceProcessorImpl.PersistEvent>, PersistenceProcessor, TimeoutHandler {
+    private static final long INITIAL_LWM_VALUE = -1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessor.class);
+    private final ReplyProcessor replyProcessor;
+    private final RingBuffer<PersistBatchEvent> persistRing;
 
-    private final String tsoHostAndPort;
-    private final LeaseManagement leaseManager;
-    final ReplyProcessor reply;
-    final RetryProcessor retryProc;
-    final CommitTable.Client commitTableClient;
-    final CommitTable.Writer writer;
-    final Panicker panicker;
-    final RingBuffer<PersistEvent> persistRing;
-    final Batch batch;
-    final Timer flushTimer;
-    final Histogram batchSizeHistogram;
-    final Meter timeoutMeter;
-    final int batchPersistTimeoutInMs;
+    private final BatchPool batchPool;
+    @VisibleForTesting
+    Batch batch;
+
+    // TODO Next two need to be either int or AtomicLong
+    volatile private long batchIDCnt;
+    volatile private long lowWatermark = INITIAL_LWM_VALUE;
 
-    AtomicLong lastFlush = new AtomicLong(System.nanoTime());
+    private MonitoringContext lowWatermarkContext;
 
     @Inject
     PersistenceProcessorImpl(TSOServerConfig config,
-                             MetricsRegistry metrics,
-                             @Named(TSO_HOST_AND_PORT_KEY) String tsoHostAndPort,
-                             LeaseManagement leaseManager,
-                             CommitTable commitTable,
-                             ReplyProcessor reply,
-                             RetryProcessor retryProc,
-                             Panicker panicker)
-            throws IOException {
-
-        this(config,
-             metrics,
-             tsoHostAndPort,
-             new Batch(config.getMaxBatchSize()),
-             leaseManager,
-             commitTable,
-             reply,
-             retryProc,
-             panicker);
+                             BatchPool batchPool,
+                             ReplyProcessor replyProcessor,
+                             Panicker panicker,
+                             PersistenceProcessorHandler[] handlers)
+            throws InterruptedException, ExecutionException, IOException {
 
-    }
+        this.batchIDCnt = 0L;
+        this.batchPool = batchPool;
+        this.batch = batchPool.getNextEmptyBatch();
 
-    @VisibleForTesting
-    PersistenceProcessorImpl(TSOServerConfig config,
-                             MetricsRegistry metrics,
-                             String tsoHostAndPort,
-                             Batch batch,
-                             LeaseManagement leaseManager,
-                             CommitTable commitTable,
-                             ReplyProcessor reply,
-                             RetryProcessor retryProc,
-                             Panicker panicker)
-            throws IOException {
-
-        this.tsoHostAndPort = tsoHostAndPort;
-        this.batch = batch;
-        this.batchPersistTimeoutInMs = config.getBatchPersistTimeoutInMs();
-        this.leaseManager = leaseManager;
-        this.commitTableClient = commitTable.getClient();
-        this.writer = commitTable.getWriter();
-        this.reply = reply;
-        this.retryProc = retryProc;
-        this.panicker = panicker;
-
-        LOG.info("Creating the persist processor with batch size {}, and timeout {}ms",
-                 config.getMaxBatchSize(), batchPersistTimeoutInMs);
-
-        flushTimer = metrics.timer(name("tso", "persist", "flush"));
-        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
-        timeoutMeter = metrics.meter(name("tso", "persist", "timeout"));
-
-        // FIXME consider putting something more like a phased strategy here to avoid
-        // all the syscalls
-        final TimeoutBlockingWaitStrategy timeoutStrategy
-                = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
-
-        persistRing = RingBuffer.createSingleProducer(
-                PersistEvent.EVENT_FACTORY, 1 << 20, timeoutStrategy); // 2^20 entries in ringbuffer
-        SequenceBarrier persistSequenceBarrier = persistRing.newBarrier();
-        BatchEventProcessor<PersistEvent> persistProcessor = new BatchEventProcessor<>(
-                persistRing,
-                persistSequenceBarrier,
-                this);
-        persistRing.addGatingSequences(persistProcessor.getSequence());
-        persistProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
-        ExecutorService persistExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
-        persistExec.submit(persistProcessor);
+        this.replyProcessor = replyProcessor;
 
-    }
+        this.persistRing = RingBuffer.createSingleProducer(
+                PersistBatchEvent.EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
 
-    @Override
-    public void onEvent(PersistEvent event, long sequence, boolean endOfBatch) throws Exception {
-
-        switch (event.getType()) {
-            case COMMIT:
-                event.getMonCtx().timerStart("commitPersistProcessor");
-                // TODO: What happens when the IOException is thrown?
-                writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
-                batch.addCommit(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(),
-                                event.getMonCtx());
-                break;
-            case ABORT:
-                sendAbortOrIdentifyFalsePositive(event.getStartTimestamp(), event.isRetry(), event.getChannel(),
-                                                 event.getMonCtx());
-                break;
-            case TIMESTAMP:
-                event.getMonCtx().timerStart("timestampPersistProcessor");
-                batch.addTimestamp(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
-                break;
-        }
-        if (batch.isFull() || endOfBatch) {
-            maybeFlushBatch();
-        }
+        WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
+                                                                          persistRing.newBarrier(),
+                                                                          new FatalExceptionHandler(panicker),
+                                                                          handlers);
+        this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
 
-    }
-
-    private void sendAbortOrIdentifyFalsePositive(long startTimestamp, boolean isRetry, Channel channel,
-                                                  MonitoringContext monCtx) {
-
-        if (!isRetry) {
-            reply.abortResponse(startTimestamp, channel, monCtx);
-            return;
-        }
+        ExecutorService requestExec = Executors.newFixedThreadPool(config.getPersistHandlerNum(),
+                                                                   new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
+        persistProcessor.start(requestExec);
 
-        // If is a retry, we must check if it is a already committed request abort.
-        // This can happen because a client could have missed the reply, so it
-        // retried the request after a timeout. So we added to the batch and when
-        // it's flushed we'll add events to the retry processor in order to check
-        // for false positive aborts. It needs to be done after the flush in case
-        // the commit has occurred but it hasn't been persisted yet.
-        batch.addUndecidedRetriedRequest(startTimestamp, channel, monCtx);
     }
 
-    // no event has been received in the timeout period
     @Override
-    public void onTimeout(final long sequence) {
-        maybeFlushBatch();
-    }
-
-    /**
-     * Flush the current batch if it's full, or the timeout has been elapsed since the last flush.
-     */
-    private void maybeFlushBatch() {
-        if (batch.isFull()) {
-            flush();
-        } else if ((System.nanoTime() - lastFlush.get()) > TimeUnit.MILLISECONDS.toNanos(batchPersistTimeoutInMs)) {
-            timeoutMeter.mark();
-            flush();
-        }
-    }
+    public void reset() throws InterruptedException {
 
-    synchronized private void flush() {
-
-        if (batch.getNumEvents() > 0) {
-            lastFlush.set(System.nanoTime());
-            commitSuicideIfNotMaster();
-            try {
-                writer.flush();
-                batchSizeHistogram.update(batch.getNumEvents());
-            } catch (IOException e) {
-                panicker.panic("Error persisting commit batch", e);
-            }
-            commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
-            flushTimer.update((System.nanoTime() - lastFlush.get()));
-            batch.sendRepliesAndReset(reply, retryProc);
-        }
+        batchIDCnt = 0L;
+        batchPool.reset();
+        batch = batchPool.getNextEmptyBatch();
+        replyProcessor.reset();
 
     }
 
-    private void commitSuicideIfNotMaster() {
-        if (!leaseManager.stillInLeasePeriod()) {
-            panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
-        }
-    }
-
     @Override
-    public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = persistRing.next();
-        PersistEvent e = persistRing.get(seq);
-        PersistEvent.makePersistCommit(e, startTimestamp, commitTimestamp, c, monCtx);
-        persistRing.publish(seq);
-    }
+    public void persistFlush() throws InterruptedException {
 
-    @Override
-    public void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
+        if (batch.isEmpty()) {
+            return;
+        }
+        batch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
         long seq = persistRing.next();
-        PersistEvent e = persistRing.get(seq);
-        PersistEvent.makePersistAbort(e, startTimestamp, isRetry, c, monCtx);
+        PersistBatchEvent e = persistRing.get(seq);
+        PersistBatchEvent.makePersistBatch(e, batch, batchIDCnt++);
         persistRing.publish(seq);
-    }
+        batch = batchPool.getNextEmptyBatch();
 
-    @Override
-    public void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = persistRing.next();
-        PersistEvent e = persistRing.get(seq);
-        PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
-        persistRing.publish(seq);
     }
 
     @Override
-    public void persistLowWatermark(long lowWatermark) {
-        try {
-            writer.updateLowWatermark(lowWatermark);
-        } catch (IOException e) {
-            LOG.error("Should not be thrown");
-        }
-    }
-
-    public static class Batch {
-
-        final PersistEvent[] events;
-        final int maxBatchSize;
-        int numEvents;
-
-        Batch(int maxBatchSize) {
-            assert (maxBatchSize > 0);
-            LOG.info("Creating the Batch with {} elements", maxBatchSize);
-            this.maxBatchSize = maxBatchSize;
-            events = new PersistEvent[maxBatchSize];
-            numEvents = 0;
-            for (int i = 0; i < maxBatchSize; i++) {
-                events[i] = new PersistEvent();
-            }
-        }
-
-        boolean isFull() {
-            assert (numEvents <= maxBatchSize);
-            return numEvents == maxBatchSize;
-        }
+    public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+            throws InterruptedException {
 
-        int getNumEvents() {
-            return numEvents;
+        batch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
+        if (batch.isLastEntryEmpty()) {
+            persistFlush();
         }
 
-        void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
-            if (isFull()) {
-                throw new IllegalStateException("batch full");
-            }
-            int index = numEvents++;
-            PersistEvent e = events[index];
-            PersistEvent.makePersistCommit(e, startTimestamp, commitTimestamp, c, monCtx);
-        }
-
-        void addUndecidedRetriedRequest(long startTimestamp, Channel c, MonitoringContext monCtx) {
-            if (isFull()) {
-                throw new IllegalStateException("batch full");
-            }
-            int index = numEvents++;
-            PersistEvent e = events[index];
-            // We mark the event as an ABORT retry to identify the events to send
-            // to the retry processor
-            PersistEvent.makePersistAbort(e, startTimestamp, true, c, monCtx);
-        }
+    }
 
-        void addTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
-            if (isFull()) {
-                throw new IllegalStateException("batch full");
-            }
-            int index = numEvents++;
-            PersistEvent e = events[index];
-            PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
-        }
+    @Override
+    public void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
+            throws InterruptedException {
 
-        void sendRepliesAndReset(ReplyProcessor reply, RetryProcessor retryProc) {
-            for (int i = 0; i < numEvents; i++) {
-                PersistEvent e = events[i];
-                switch (e.getType()) {
-                    case TIMESTAMP:
-                        e.getMonCtx().timerStop("timestampPersistProcessor");
-                        reply.timestampResponse(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
-                        break;
-                    case COMMIT:
-                        e.getMonCtx().timerStop("commitPersistProcessor");
-
-                        reply.commitResponse(e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(), e.getMonCtx());
-                        break;
-                    case ABORT:
-                        if (e.isRetry()) {
-                            retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(),
-                                                                            e.getMonCtx());
-                        } else {
-                            LOG.error("We should not be receiving non-retried aborted requests in here");
-                        }
-                        break;
-                    default:
-                        LOG.error("We should receive only COMMIT or ABORT event types. Received {}", e.getType());
-                        break;
-                }
-            }
-            numEvents = 0;
+        batch.addAbort(startTimestamp, isRetry, c, context);
+        if (batch.isLastEntryEmpty()) {
+            persistFlush();
         }
 
     }
 
-    public final static class PersistEvent {
-
-        private MonitoringContext monCtx;
+    @Override
+    public void persistTimestamp(long startTimestamp, Channel c, MonitoringContext context)
+            throws InterruptedException {
 
-        enum Type {
-            TIMESTAMP, COMMIT, ABORT
+        batch.addTimestamp(startTimestamp, c, context);
+        if (batch.isLastEntryEmpty()) {
+            persistFlush();
         }
 
-        private Type type = null;
-        private Channel channel = null;
-
-        private boolean isRetry = false;
-        private long startTimestamp = 0;
-        private long commitTimestamp = 0;
-
-        static void makePersistCommit(PersistEvent e, long startTimestamp, long commitTimestamp, Channel c,
-                                      MonitoringContext monCtx) {
-            e.type = Type.COMMIT;
-            e.startTimestamp = startTimestamp;
-            e.commitTimestamp = commitTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
+    }
 
-        static void makePersistAbort(PersistEvent e, long startTimestamp, boolean isRetry, Channel c,
-                                     MonitoringContext monCtx) {
-            e.type = Type.ABORT;
-            e.startTimestamp = startTimestamp;
-            e.isRetry = isRetry;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
+    @Override
+    public void persistLowWatermark(long lowWatermark, MonitoringContext context) {
 
-        static void makePersistTimestamp(PersistEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
+        this.lowWatermark = lowWatermark;
+        this.lowWatermarkContext = context;
 
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
+    }
 
-        Type getType() {
-            return type;
-        }
+    final static class PersistBatchEvent {
 
-        Channel getChannel() {
-            return channel;
-        }
+        private Batch batch;
+        private long batchID;
 
-        boolean isRetry() {
-            return isRetry;
+        static void makePersistBatch(PersistBatchEvent e, Batch batch, long batchID) {
+            e.batch = batch;
+            e.batchID = batchID;
         }
 
-        long getStartTimestamp() {
-            return startTimestamp;
+        Batch getBatch() {
+            return batch;
         }
 
-        long getCommitTimestamp() {
-            return commitTimestamp;
+        long getBatchID() {
+            return batchID;
         }
 
-        public final static EventFactory<PersistEvent> EVENT_FACTORY = new EventFactory<PersistEvent>() {
-            @Override
-            public PersistEvent newInstance() {
-                return new PersistEvent();
+        final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
+            public PersistBatchEvent newInstance() {
+                return new PersistBatchEvent();
             }
         };
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 6aa6faa..0a3a00d 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -20,9 +20,17 @@ package org.apache.omid.tso;
 import org.jboss.netty.channel.Channel;
 
 interface ReplyProcessor {
+
+    // TODO This documentation does not corresponds to the method below anymore. Put in the right place or remove
     /**
      * Informs the client about the outcome of the Tx it was trying to commit.
      *
+     * @param batch
+     *            the batch of operations
+     * @param batchID
+     *            the id of the batch, used to enforce order between replies
+     * @param makeHeuristicDecision
+     *            informs about whether heuristic actions are needed or not
      * @param startTimestamp
      *            the start timestamp of the transaction (a.k.a. tx id)
      * @param commitTimestamp
@@ -30,10 +38,13 @@ interface ReplyProcessor {
      * @param channel
      *            the communication channed with the client
      */
-    void commitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
+    void batchResponse(Batch batch, long batchID, boolean makeHeuristicDecision);
+
+    void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context);
+
+    void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context);
 
-    void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
+    void reset();
 
-    void timestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index d87f9a7..55162c7 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -18,6 +18,7 @@
 package org.apache.omid.tso;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
 import com.lmax.disruptor.BatchEventProcessor;
 import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
@@ -27,102 +28,181 @@ import com.lmax.disruptor.SequenceBarrier;
 import org.apache.omid.metrics.Meter;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.proto.TSOProto;
+
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
+import java.util.Comparator;
+import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.omid.metrics.MetricsUtils.name;
+import static com.codahale.metrics.MetricRegistry.name;
 
-class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>, ReplyProcessor {
+class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
 
-    final RingBuffer<ReplyEvent> replyRing;
-    final Meter abortMeter;
-    final Meter commitMeter;
-    final Meter timestampMeter;
+    private static final int NO_ORDER = (-1);
+
+    private final RingBuffer<ReplyBatchEvent> replyRing;
+
+    private AtomicLong nextIDToHandle = new AtomicLong();
+
+    private PriorityQueue<ReplyBatchEvent> futureEvents;
+
+    // Metrics
+    private final Meter abortMeter;
+    private final Meter commitMeter;
+    private final Meter timestampMeter;
 
     @Inject
     ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker) {
-        replyRing = RingBuffer.createMultiProducer(ReplyEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
+
+        this.nextIDToHandle.set(0);
+
+        this.replyRing = RingBuffer.createMultiProducer(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
+
         SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
-        BatchEventProcessor<ReplyEvent> replyProcessor = new BatchEventProcessor<ReplyEvent>(
-                replyRing, replySequenceBarrier, this);
+        BatchEventProcessor<ReplyBatchEvent> replyProcessor = new BatchEventProcessor<>(replyRing, replySequenceBarrier, this);
         replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
 
         replyRing.addGatingSequences(replyProcessor.getSequence());
 
-        ExecutorService replyExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("reply-%d").build());
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d").build();
+        ExecutorService replyExec = Executors.newSingleThreadExecutor(threadFactory);
         replyExec.submit(replyProcessor);
 
-        abortMeter = metrics.meter(name("tso", "aborts"));
-        commitMeter = metrics.meter(name("tso", "commits"));
-        timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
+        this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
+            public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
+                return Long.compare(replyBatchEvent1.getBatchID(), replyBatchEvent2.getBatchID());
+            }
+        });
+
+        this.abortMeter = metrics.meter(name("tso", "aborts"));
+        this.commitMeter = metrics.meter(name("tso", "commits"));
+        this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
+
     }
 
-    public void onEvent(ReplyEvent event, long sequence, boolean endOfBatch) throws Exception {
-        String name = null;
-        try {
-            switch (event.getType()) {
-                case COMMIT:
-                    name = "commitReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
-                    break;
-                case ABORT:
-                    name = "abortReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleAbortResponse(event.getStartTimestamp(), event.getChannel());
-                    break;
-                case TIMESTAMP:
-                    name = "timestampReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleTimestampResponse(event.getStartTimestamp(), event.getChannel());
-                    break;
-                default:
-                    LOG.error("Unknown event {}", event.getType());
-                    break;
-            }
-        } finally {
-            if (name != null) {
-                event.getMonCtx().timerStop(name);
+    public void reset() {
+        nextIDToHandle.set(0);
+    }
+
+    private void handleReplyBatchEvent(ReplyBatchEvent event) {
+
+        String name;
+        Batch batch = event.getBatch();
+        for (int i=0; batch != null && i < batch.getNumEvents(); ++i) {
+            PersistEvent localEvent = batch.getEvent(i);
+
+            switch (localEvent.getType()) {
+            case COMMIT:
+                name = "commitReplyProcessor";
+                localEvent.getMonCtx().timerStart(name);
+                handleCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel(), event.getMakeHeuristicDecision());
+                localEvent.getMonCtx().timerStop(name);
+                 break;
+            case ABORT:
+                name = "abortReplyProcessor";
+                localEvent.getMonCtx().timerStart(name);
+                handleAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
+                localEvent.getMonCtx().timerStop(name);
+                break;
+            case TIMESTAMP:
+                name = "timestampReplyProcessor";
+                localEvent.getMonCtx().timerStart(name);
+                handleTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
+                localEvent.getMonCtx().timerStop(name);
+                break;
+            case LOW_WATERMARK:
+                break;
+            default:
+                LOG.error("Unknown event {}", localEvent.getType());
+                break;
             }
+            localEvent.getMonCtx().publish();
+        }
+
+        if (batch != null) {
+            batch.clear();
         }
-        event.getMonCtx().publish();
+
+    }
+
+    private void processWaitingEvents() {
+
+        while (!futureEvents.isEmpty() && futureEvents.peek().getBatchID() == nextIDToHandle.get()) {
+            ReplyBatchEvent e = futureEvents.poll();
+            handleReplyBatchEvent(e);
+            nextIDToHandle.incrementAndGet();
+        }
+
+    }
+
+    public void onEvent(ReplyBatchEvent event, long sequence, boolean endOfBatch) throws Exception {
+
+        // Order of event's reply need to be guaranteed in order to preserve snapshot isolation.
+        // This is done in order to present a scenario where a start id of N is returned
+        // while commit smaller than still does not appear in the commit table.
+
+        // If previous events were not processed yet (events contain smaller id)
+        if (event.getBatchID() > nextIDToHandle.get()) {
+            futureEvents.add(event);
+            return;
+         }
+
+        handleReplyBatchEvent(event);
+
+        if (event.getBatchID() == NO_ORDER) {
+            return;
+        }
+
+        nextIDToHandle.incrementAndGet();
+
+        // Process events that arrived before and kept in futureEvents.
+        processWaitingEvents();
+
     }
 
     @Override
-    public void commitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+    public void batchResponse(Batch batch, long batchID, boolean makeHeuristicDecision) {
+
         long seq = replyRing.next();
-        ReplyEvent e = replyRing.get(seq);
-        ReplyEvent.makeCommitResponse(e, startTimestamp, commitTimestamp, c, monCtx);
+        ReplyBatchEvent e = replyRing.get(seq);
+        ReplyBatchEvent.makeReplyBatch(e, batch, batchID, makeHeuristicDecision);
         replyRing.publish(seq);
+
     }
 
     @Override
-    public void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = replyRing.next();
-        ReplyEvent e = replyRing.get(seq);
-        ReplyEvent.makeAbortResponse(e, startTimestamp, c, monCtx);
-        replyRing.publish(seq);
+    public void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context) {
+
+        batch.addAbort(startTimestamp, true, c, context);
+        batchResponse(batch, NO_ORDER, false);
+
     }
 
     @Override
-    public void timestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = replyRing.next();
-        ReplyEvent e = replyRing.get(seq);
-        ReplyEvent.makeTimestampReponse(e, startTimestamp, c, monCtx);
-        replyRing.publish(seq);
+    public void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
+
+        batch.addCommit(startTimestamp, commitTimestamp, c, context);
+        batchResponse(batch, NO_ORDER, false);
+
     }
 
-    void handleCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
+    private void handleCommitResponse(long startTimestamp, long commitTimestamp, Channel c,
+                                      boolean makeHeuristicDecision) {
+
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
+        // TODO Remove heuristic decissions as is not in the protocol anymore
+        if (makeHeuristicDecision) { // If the commit is ambiguous is due to a new master TSO
+//            commitBuilder.setMakeHeuristicDecision(true);
+        }
         commitBuilder.setAborted(false)
                 .setStartTimestamp(startTimestamp)
                 .setCommitTimestamp(commitTimestamp);
@@ -130,20 +210,24 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
         c.write(builder.build());
 
         commitMeter.mark();
+
     }
 
-    void handleAbortResponse(long startTimestamp, Channel c) {
+    private void handleAbortResponse(long startTimestamp, Channel c) {
+
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
-        commitBuilder.setAborted(true)
-                .setStartTimestamp(startTimestamp);
+        commitBuilder.setAborted(true);
+        commitBuilder.setStartTimestamp(startTimestamp);
         builder.setCommitResponse(commitBuilder.build());
         c.write(builder.build());
 
         abortMeter.mark();
+
     }
 
-    void handleTimestampResponse(long startTimestamp, Channel c) {
+    private void handleTimestampResponse(long startTimestamp, Channel c) {
+
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
         respBuilder.setStartTimestamp(startTimestamp);
@@ -151,69 +235,36 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>,
         c.write(builder.build());
 
         timestampMeter.mark();
-    }
-
-    public final static class ReplyEvent {
-
-        enum Type {
-            TIMESTAMP, COMMIT, ABORT
-        }
-
-        private Type type = null;
-        private Channel channel = null;
-
-        private long startTimestamp = 0;
-        private long commitTimestamp = 0;
-        private MonitoringContext monCtx;
 
-        Type getType() {
-            return type;
-        }
+    }
 
-        Channel getChannel() {
-            return channel;
-        }
+    final static class ReplyBatchEvent {
 
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
+        private Batch batch;
+        private long batchID;
+        private boolean makeHeuristicDecision;
 
-        long getCommitTimestamp() {
-            return commitTimestamp;
+        static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchID, boolean makeHeuristicDecision) {
+            e.batch = batch;
+            e.batchID = batchID;
+            e.makeHeuristicDecision = makeHeuristicDecision;
         }
 
-        MonitoringContext getMonCtx() {
-            return monCtx;
+        Batch getBatch() {
+            return batch;
         }
 
-        static void makeTimestampReponse(ReplyEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makeCommitResponse(ReplyEvent e, long startTimestamp,
-                                       long commitTimestamp, Channel c, MonitoringContext monCtx) {
-
-            e.type = Type.COMMIT;
-            e.startTimestamp = startTimestamp;
-            e.commitTimestamp = commitTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
+        long getBatchID() {
+            return batchID;
         }
 
-        static void makeAbortResponse(ReplyEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.type = Type.ABORT;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
+        boolean getMakeHeuristicDecision() {
+            return makeHeuristicDecision;
         }
 
-        public final static EventFactory<ReplyEvent> EVENT_FACTORY = new EventFactory<ReplyEvent>() {
-            @Override
-            public ReplyEvent newInstance() {
-                return new ReplyEvent();
+        final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {
+            public ReplyBatchEvent newInstance() {
+                return new ReplyBatchEvent();
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/f6220a84/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index cf4b44d..709a529 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -19,11 +19,12 @@ package org.apache.omid.tso;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.SequenceBarrier;
+import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
+import com.lmax.disruptor.TimeoutHandler;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.tso.TSOStateManager.TSOState;
 import org.jboss.netty.channel.Channel;
@@ -37,25 +38,26 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
-public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor {
+class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
 
     private final TimestampOracle timestampOracle;
-    public final CommitHashMap hashmap;
+    private final CommitHashMap hashmap;
     private final MetricsRegistry metrics;
     private final PersistenceProcessor persistProc;
     private final RingBuffer<RequestEvent> requestRing;
     private long lowWatermark = -1L;
-    private long epoch = -1L;
 
     @Inject
-    RequestProcessorImpl(TSOServerConfig config,
-                         MetricsRegistry metrics,
+    RequestProcessorImpl(MetricsRegistry metrics,
                          TimestampOracle timestampOracle,
                          PersistenceProcessor persistProc,
-                         Panicker panicker) throws IOException {
+                         Panicker panicker,
+                         TSOServerConfig config)
+            throws IOException {
 
         this.metrics = metrics;
 
@@ -64,11 +66,14 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
 
         this.hashmap = new CommitHashMap(config.getMaxItems());
 
+        final TimeoutBlockingWaitStrategy timeoutStrategy
+                = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
+
         // Set up the disruptor thread
-        requestRing = RingBuffer.createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
+        requestRing = RingBuffer.createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12, timeoutStrategy);
         SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
         BatchEventProcessor<RequestEvent> requestProcessor =
-                new BatchEventProcessor<RequestEvent>(requestRing, requestSequenceBarrier, this);
+                new BatchEventProcessor<>(requestRing, requestSequenceBarrier, this);
         requestRing.addGatingSequences(requestProcessor.getSequence());
         requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
 
@@ -83,16 +88,17 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
      * This should be called when the TSO gets leadership
      */
     @Override
-    public void update(TSOState state) {
+    public void update(TSOState state) throws InterruptedException {
         LOG.info("Initializing RequestProcessor...");
         this.lowWatermark = state.getLowWatermark();
-        persistProc.persistLowWatermark(lowWatermark);
-        this.epoch = state.getEpoch();
-        LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, epoch);
+        persistProc.persistLowWatermark(lowWatermark, new MonitoringContext(metrics));
+        persistProc.reset();
+        LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
     }
 
     @Override
     public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
+
         String name = null;
         try {
             if (event.getType() == RequestEvent.Type.TIMESTAMP) {
@@ -110,25 +116,42 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
             }
         }
 
+        if (endOfBatch) {
+            persistProc.persistFlush();
+        }
+
+    }
+
+    @Override
+    public void onTimeout(long sequence) throws Exception {
+
+        persistProc.persistFlush();
+
     }
 
     @Override
     public void timestampRequest(Channel c, MonitoringContext monCtx) {
+
         long seq = requestRing.next();
         RequestEvent e = requestRing.get(seq);
         RequestEvent.makeTimestampRequest(e, c, monCtx);
         requestRing.publish(seq);
+
     }
 
     @Override
-    public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c, MonitoringContext monCtx) {
+    public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c,
+                              MonitoringContext monCtx) {
+
         long seq = requestRing.next();
         RequestEvent e = requestRing.get(seq);
         RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
         requestRing.publish(seq);
+
     }
 
-    public void handleTimestamp(RequestEvent requestEvent) {
+    private void handleTimestamp(RequestEvent requestEvent) throws InterruptedException {
+
         long timestamp;
 
         try {
@@ -139,15 +162,17 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
         }
 
         persistProc.persistTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
+
     }
 
-    public long handleCommit(RequestEvent event) {
+    private long handleCommit(RequestEvent event) throws InterruptedException {
+
         long startTimestamp = event.getStartTimestamp();
         Iterable<Long> writeSet = event.writeSet();
         boolean isRetry = event.isRetry();
         Channel c = event.getChannel();
 
-        boolean committed = false;
+        boolean committed;
         long commitTimestamp = 0L;
 
         int numCellsInWriteset = 0;
@@ -183,7 +208,7 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
                     if (newLowWatermark != lowWatermark) {
                         LOG.trace("Setting new low Watermark to {}", newLowWatermark);
                         lowWatermark = newLowWatermark;
-                        persistProc.persistLowWatermark(newLowWatermark);
+                        persistProc.persistLowWatermark(newLowWatermark, event.getMonCtx());
                     }
                 }
                 persistProc.persistCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
@@ -195,6 +220,7 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
         }
 
         return commitTimestamp;
+
     }
 
     final static class RequestEvent implements Iterable<Long> {
@@ -222,8 +248,11 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
         }
 
         static void makeCommitRequest(RequestEvent e,
-                                      long startTimestamp, MonitoringContext monCtx, Collection<Long> writeSet,
-                                      boolean isRetry, Channel c) {
+                                      long startTimestamp,
+                                      MonitoringContext monCtx,
+                                      Collection<Long> writeSet,
+                                      boolean isRetry,
+                                      Channel c) {
             e.monCtx = monCtx;
             e.type = Type.COMMIT;
             e.channel = c;
@@ -241,6 +270,7 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
                     i++;
                 }
             }
+
         }
 
         MonitoringContext getMonCtx() {
@@ -261,9 +291,11 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
 
         @Override
         public Iterator<Long> iterator() {
+
             if (writeSetAsCollection != null) {
                 return writeSetAsCollection.iterator();
             }
+
             return new Iterator<Long>() {
                 int i = 0;
 
@@ -285,23 +317,26 @@ public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.R
                     throw new UnsupportedOperationException();
                 }
             };
+
         }
 
         Iterable<Long> writeSet() {
+
             return this;
+
         }
 
         boolean isRetry() {
             return isRetry;
         }
 
-        public final static EventFactory<RequestEvent> EVENT_FACTORY
-                = new EventFactory<RequestEvent>() {
+        final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
             @Override
             public RequestEvent newInstance() {
                 return new RequestEvent();
             }
         };
+
     }
 
 }