You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/18 19:34:40 UTC

[5/7] incubator-omid git commit: Fix bug in retry request management that didn't return batches to pool

Fix bug in retry request management that didn't return batches to pool

Add new complete tests for PersistentProcessorHandler component.
Also improve readability.

Change-Id: Ied72d420bb229c3466b7b89cf14b0756c93b2e31


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/3659b96c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/3659b96c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/3659b96c

Branch: refs/heads/master
Commit: 3659b96c5a775bd0f9bfbf9b65a67e5e7f0c6da2
Parents: 484d116
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Fri May 13 14:08:09 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/omid/tso/Batch.java    |  49 +-
 .../java/org/apache/omid/tso/PersistEvent.java  |  12 +-
 .../apache/omid/tso/PersistenceProcessor.java   |   2 +-
 .../omid/tso/PersistenceProcessorHandler.java   | 107 ++--
 .../omid/tso/PersistenceProcessorImpl.java      |   4 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java |  39 +-
 .../apache/omid/tso/RequestProcessorImpl.java   |  13 +-
 .../tso/TestPersistenceProcessorHandler.java    | 488 +++++++++++++++++++
 8 files changed, 607 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index c5ed696..06ddc71 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -35,11 +35,11 @@ public class Batch {
     private final int id;
     private final int size;
     private int numEvents;
-    private final PersistEvent[] events;
+    private final PersistEvent[] events; // TODO Check if it's worth to have a dynamic structure for this
 
     Batch(int id, int size) {
 
-        Preconditions.checkArgument(size > 0, "Size must be positive");
+        Preconditions.checkArgument(size > 0, "Size [%s] must be positive", size);
         this.size = size;
         this.id = id;
         this.numEvents = 0;
@@ -51,41 +51,46 @@ public class Batch {
 
     }
 
-    boolean isFull() {
-
-        Preconditions.checkState(numEvents <= size, "numEvents > size");
-        return numEvents == size;
+    PersistEvent get(int idx) {
+        Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents,
+                                 "Accessing Events array (Size = %s) with wrong index [%s]", numEvents, idx);
+        return events[idx];
+    }
 
+    void set(int idx, PersistEvent event) {
+        Preconditions.checkState(0 <= idx && idx < numEvents);
+        events[idx] = event;
     }
 
-    boolean isEmpty() {
+    void clear() {
 
-        return numEvents == 0;
+        numEvents = 0;
 
     }
 
+    void decreaseNumEvents() {
+        numEvents--;
+    }
+
     int getNumEvents() {
         return numEvents;
     }
 
-    void clear() {
+    int getLastEventIdx() {
+        return numEvents - 1;
+    }
 
-        numEvents = 0;
+    boolean isFull() {
 
-    }
+        Preconditions.checkState(numEvents <= size, "Batch Full: numEvents [%s] > size [%s]", numEvents, size);
+        return numEvents == size;
 
-    PersistEvent get(int idx) {
-        Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents);
-        return events[idx];
     }
 
-    void set(int idx, PersistEvent event) {
-        Preconditions.checkState(0 <= idx && idx < numEvents);
-        events[idx] = event;
-    }
+    boolean isEmpty() {
+
+        return numEvents == 0;
 
-    void decreaseNumEvents() {
-        numEvents--;
     }
 
     void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
@@ -96,11 +101,11 @@ public class Batch {
 
     }
 
-    void addAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context) {
+    void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
         Preconditions.checkState(!isFull(), "batch is full");
         int index = numEvents++;
         PersistEvent e = events[index];
-        e.makePersistAbort(startTimestamp, isRetry, c, context);
+        e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 8d4fd85..9bf9e89 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -31,7 +31,7 @@ public final class PersistEvent {
     private Type type = null;
     private Channel channel = null;
 
-    private boolean isRetry = false;
+    private boolean isCommitRetry = false;
     private long startTimestamp = 0L;
     private long commitTimestamp = 0L;
     private long lowWatermark = 0L;
@@ -46,11 +46,11 @@ public final class PersistEvent {
 
     }
 
-    void makePersistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
+    void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
 
         this.type = Type.ABORT;
         this.startTimestamp = startTimestamp;
-        this.isRetry = isRetry;
+        this.isCommitRetry = isCommitRetry;
         this.channel = c;
         this.monCtx = monCtx;
 
@@ -83,9 +83,9 @@ public final class PersistEvent {
 
     }
 
-    boolean isRetry() {
+    boolean isCommitRetry() {
 
-        return isRetry;
+        return isCommitRetry;
 
     }
 
@@ -111,7 +111,7 @@ public final class PersistEvent {
     public String toString() {
         return Objects.toStringHelper(this)
                 .add("type", type)
-                .add("isRetry", isRetry)
+                .add("isCommitRetry", isCommitRetry)
                 .add("ST", startTimestamp)
                 .add("CT", commitTimestamp)
                 .add("LWM", lowWatermark)

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index f4e4a51..0b1a34f 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,7 +26,7 @@ interface PersistenceProcessor {
     void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
             throws Exception;
 
-    void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) throws Exception;
+    void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
 
     void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index cf7557a..079c8c4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -39,12 +39,13 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
     private final LeaseManagement leaseManager;
 
     private final ReplyProcessor replyProcessor;
-    private final RetryProcessor retryProc;
+    private final RetryProcessor retryProcessor;
     private final CommitTable.Writer writer;
     final Panicker panicker;
 
     private final Timer flushTimer;
     private final Histogram batchSizeHistogram;
+    private final Histogram flushedCommitEventsHistogram;
 
     @Inject
     PersistenceProcessorHandler(MetricsRegistry metrics,
@@ -52,7 +53,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
                                 LeaseManagement leaseManager,
                                 CommitTable commitTable,
                                 ReplyProcessor replyProcessor,
-                                RetryProcessor retryProc,
+                                RetryProcessor retryProcessor,
                                 Panicker panicker)
     throws InterruptedException, ExecutionException, IOException {
 
@@ -60,54 +61,63 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
         this.leaseManager = leaseManager;
         this.writer = commitTable.getWriter();
         this.replyProcessor = replyProcessor;
-        this.retryProc = retryProc;
+        this.retryProcessor = retryProcessor;
         this.panicker = panicker;
 
-        flushTimer = metrics.timer(name("tso", "persist", "flush"));
-        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
+        // Metrics in this component
+        flushTimer = metrics.timer(name("tso", "persist", "flush", "latency"));
+        flushedCommitEventsHistogram = metrics.histogram(name("tso", "persist", "flushed", "commits", "size"));
+        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batch", "size"));
 
     }
 
     @Override
-    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent event) throws Exception {
+    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
 
-        Batch batch = event.getBatch();
-        for (int i=0; i < batch.getNumEvents(); ++i) {
-            PersistEvent localEvent = batch.get(i);
+        int commitEventsToFlush = 0;
+        Batch batch = batchEvent.getBatch();
+        int numOfBatchedEvents = batch.getNumEvents();
+        batchSizeHistogram.update(numOfBatchedEvents);
+        for (int i=0; i < numOfBatchedEvents; ++i) {
+            PersistEvent event = batch.get(i);
 
-            switch (localEvent.getType()) {
+            switch (event.getType()) {
             case COMMIT:
-                localEvent.getMonCtx().timerStart("commitPersistProcessor");
+                event.getMonCtx().timerStart("commitPersistProcessor");
                 // TODO: What happens when the IOException is thrown?
-                writer.addCommittedTransaction(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp());
+                writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
+                commitEventsToFlush++;
                 break;
             case ABORT:
                 break;
             case TIMESTAMP:
-                localEvent.getMonCtx().timerStart("timestampPersistProcessor");
+                event.getMonCtx().timerStart("timestampPersistProcessor");
                 break;
-            default:
-                throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
             }
         }
-        if (batch.getNumEvents() > 0) {
-            flush(batch.getNumEvents());
-            sendReplies(batch, event.getBatchSequence());
-        }
+
+        // Flush and send the responses back to the client. WARNING: Before sending the responses, first we need
+        // to filter commit retries in the batch to disambiguate them.
+        flush(commitEventsToFlush);
+        filterAndDissambiguateClientRetries(batch);
+        replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
+
     }
 
-    private void flush(int numBatchedEvents) {
+    void flush(int commitEventsToFlush) {
 
-            commitSuicideIfNotMaster();
-            try {
-                long startFlushTimeInNs = System.nanoTime();
+        commitSuicideIfNotMaster();
+        try {
+            long startFlushTimeInNs = System.nanoTime();
+            if(commitEventsToFlush > 0) {
                 writer.flush();
-                flushTimer.update(System.nanoTime() - startFlushTimeInNs);
-                batchSizeHistogram.update(numBatchedEvents);
-            } catch (IOException e) {
-                panicker.panic("Error persisting commit batch", e);
             }
-            commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
+            flushTimer.update(System.nanoTime() - startFlushTimeInNs);
+            flushedCommitEventsHistogram.update(commitEventsToFlush);
+        } catch (IOException e) {
+            panicker.panic("Error persisting commit batch", e);
+        }
+        commitSuicideIfNotMaster();
 
     }
 
@@ -117,30 +127,33 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
         }
     }
 
-    private void sendReplies(Batch batch, long batchSequence) {
-
-        int i = 0;
-        while (i < batch.getNumEvents()) {
-            PersistEvent e = batch.get(i);
-            if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) {
-                retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
-                PersistEvent tmp = batch.get(i);
-                //TODO: why assign it?
-                batch.set(i, batch.get(batch.getNumEvents() - 1));
-                batch.set(batch.getNumEvents()  - 1, tmp);
-                if (batch.getNumEvents()  == 1) {
-                    batch.clear();
-                    replyProcessor.manageResponsesBatch(batchSequence, null);
-                    return;
-                }
+    void filterAndDissambiguateClientRetries(Batch batch) {
+
+        int currentEventIdx = 0;
+        while (currentEventIdx <= batch.getLastEventIdx()) {
+            PersistEvent event = batch.get(currentEventIdx);
+            if (event.isCommitRetry()) {
+                retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
+                // Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
+                swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
                 batch.decreaseNumEvents();
-                continue;
+                if (batch.isEmpty()) {
+                    break; // We're OK to call now the reply processor
+                } else {
+                    continue; // Otherwise we continue checking for retries from the new event in the current position
+                }
+            } else {
+                currentEventIdx++; // Let's check if the next event was a retry
             }
-            i++;
         }
 
-        replyProcessor.manageResponsesBatch(batchSequence, batch);
+    }
 
+    private void swapBatchElements(Batch batch, int firstIdx, int lastIdx) {
+        PersistEvent tmpEvent = batch.get(firstIdx);
+        PersistEvent lastEventInBatch = batch.get(lastIdx);
+        batch.set(firstIdx, lastEventInBatch);
+        batch.set(lastIdx, tmpEvent);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 761d30b..0da0c91 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,10 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     }
 
     @Override
-    public void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
+    public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
             throws Exception {
 
-        currentBatch.addAbort(startTimestamp, isRetry, c, context);
+        currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
         if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index c1709ef..91df214 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -90,42 +90,37 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    private void handleReplyBatchEvent(ReplyBatchEvent event) throws Exception {
+    private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
 
         String name;
-        Batch batch = event.getBatch();
-        for (int i = 0; batch != null && i < batch.getNumEvents(); ++i) {
-            PersistEvent localEvent = batch.get(i);
+        Batch batch = replyBatchEvent.getBatch();
+        for (int i = 0; i < batch.getNumEvents(); i++) {
+            PersistEvent event = batch.get(i);
 
-            switch (localEvent.getType()) {
+            switch (event.getType()) {
             case COMMIT:
                 name = "commitReplyProcessor";
-                localEvent.getMonCtx().timerStart(name);
-                sendCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
-                localEvent.getMonCtx().timerStop(name);
-                 break;
+                event.getMonCtx().timerStart(name);
+                sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
+                event.getMonCtx().timerStop(name);
+                break;
             case ABORT:
                 name = "abortReplyProcessor";
-                localEvent.getMonCtx().timerStart(name);
-                sendAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
-                localEvent.getMonCtx().timerStop(name);
+                event.getMonCtx().timerStart(name);
+                sendAbortResponse(event.getStartTimestamp(), event.getChannel());
+                event.getMonCtx().timerStop(name);
                 break;
             case TIMESTAMP:
                 name = "timestampReplyProcessor";
-                localEvent.getMonCtx().timerStart(name);
-                sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
-                localEvent.getMonCtx().timerStop(name);
-                break;
-            default:
-                LOG.error("Unknown event {}", localEvent.getType());
+                event.getMonCtx().timerStart(name);
+                sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
+                event.getMonCtx().timerStop(name);
                 break;
             }
-            localEvent.getMonCtx().publish();
+            event.getMonCtx().publish();
         }
 
-        if (batch != null) {
-            batchPool.returnObject(batch);
-        }
+        batchPool.returnObject(batch);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 0ad5c08..372733e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -38,7 +38,6 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
@@ -163,7 +162,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
 
         long startTimestamp = event.getStartTimestamp();
         Iterable<Long> writeSet = event.writeSet();
-        boolean isRetry = event.isRetry();
+        boolean isCommitRetry = event.isCommitRetry();
         Channel c = event.getChannel();
 
         boolean txCanCommit;
@@ -207,7 +206,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
 
         } else { // add it to the aborted list
-            persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
+            persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
         }
 
     }
@@ -221,7 +220,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
         private Type type = null;
         private Channel channel = null;
 
-        private boolean isRetry = false;
+        private boolean isCommitRetry = false;
         private long startTimestamp = 0;
         private MonitoringContext monCtx;
         private long numCells = 0;
@@ -246,7 +245,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             e.type = Type.COMMIT;
             e.channel = c;
             e.startTimestamp = startTimestamp;
-            e.isRetry = isRetry;
+            e.isCommitRetry = isRetry;
             if (writeSet.size() > MAX_INLINE) {
                 e.numCells = writeSet.size();
                 e.writeSetAsCollection = writeSet;
@@ -315,8 +314,8 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
 
         }
 
-        boolean isRetry() {
-            return isRetry;
+        boolean isCommitRetry() {
+            return isCommitRetry;
         }
 
         final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
new file mode 100644
index 0000000..4dcc2b0
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
+import org.jboss.netty.channel.Channel;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestPersistenceProcessorHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessorHandler.class);
+
+    private static final int BATCH_ID = 0;
+    private static final int BATCH_SIZE = 6;
+    private static final long BATCH_SEQUENCE = 0;
+
+    private static final long FIRST_ST = 0L;
+    private static final long FIRST_CT = 1L;
+    private static final long SECOND_ST = 2L;
+    private static final long SECOND_CT = 3L;
+    private static final long THIRD_ST = 4L;
+    private static final long THIRD_CT = 5L;
+    private static final long FOURTH_ST = 6L;
+    private static final long FOURTH_CT = 7L;
+    private static final long FIFTH_ST = 8L;
+    private static final long FIFTH_CT = 9L;
+    private static final long SIXTH_ST = 10L;
+
+    @Mock
+    private CommitTable.Writer mockWriter;
+    @Mock
+    private CommitTable.Client mockClient;
+    @Mock
+    private LeaseManager leaseManager;
+    @Mock
+    private ReplyProcessor replyProcessor;
+    @Mock
+    private RetryProcessor retryProcessor;
+    @Mock
+    private Panicker panicker;
+
+    private CommitTable commitTable;
+
+    private MetricsRegistry metrics;
+
+    // Component under test
+    private PersistenceProcessorHandler persistenceHandler;
+
+    @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+    public void initMocksAndComponents() throws Exception {
+
+        MockitoAnnotations.initMocks(this);
+
+        // Configure null metrics provider
+        metrics = new NullMetricsProvider();
+
+        // Configure commit table to return the mocked writer and client
+        commitTable = new CommitTable() {
+            @Override
+            public Writer getWriter() {
+                return mockWriter;
+            }
+
+            @Override
+            public Client getClient() {
+                return mockClient;
+            }
+        };
+
+        // Simulate we're master for most of the tests
+        doReturn(true).when(leaseManager).stillInLeasePeriod();
+
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+    }
+
+    @AfterMethod
+    void afterMethod() {
+        Mockito.reset(mockWriter);
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertTrue(batch.isEmpty());
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
+
+        final boolean IS_RETRY = false;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 0);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+        // ------------------------------------------------------------------------------------------------------------
+        // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+        // ------------------------------------------------------------------------------------------------------------
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
+        assertEquals(batch.get(0).getCommitTimestamp(), SECOND_CT);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 0);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
+
+        final boolean IS_RETRY = false;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 2);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(1).getStartTimestamp(), SECOND_ST);
+
+    }
+
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
+        batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
+        batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 6);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 4);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(1).getStartTimestamp(), FIFTH_ST);
+        assertEquals(batch.get(1).getCommitTimestamp(), FIFTH_CT);
+        assertEquals(batch.get(2).getStartTimestamp(), THIRD_ST);
+        assertEquals(batch.get(2).getCommitTimestamp(), THIRD_CT);
+        assertEquals(batch.get(3).getStartTimestamp(), FOURTH_ST);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testPanicPersistingEvents() throws Exception {
+
+        // User the real panicker
+        Panicker panicker = spy(new RuntimeExceptionPanicker());
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        doThrow(IOException.class).when(mockWriter).flush();
+
+        try {
+            persistenceHandler.onEvent(batchEvent);
+            fail();
+        } catch (RuntimeException re) {
+            // Expected
+        }
+
+        verify(persistenceHandler, times(1)).flush(1);
+        verify(panicker, times(1)).panic(eq("Error persisting commit batch"), any(IOException.class));
+        verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+        verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testPanicBecauseMasterLosesMastership() throws Exception {
+
+        // ------------------------------------------------------------------------------------------------------------
+        // 1) Test panic before flushing
+        // ------------------------------------------------------------------------------------------------------------
+
+        // Simulate we lose mastership BEFORE flushing
+        doReturn(false).when(leaseManager).stillInLeasePeriod();
+
+        // User the real panicker
+        Panicker panicker = spy(new RuntimeExceptionPanicker());
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        try {
+            persistenceHandler.onEvent(batchEvent);
+            fail();
+        } catch (RuntimeException re) {
+            // Expected
+        }
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(mockWriter, never()).flush();
+        verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
+        verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+        verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+        // ------------------------------------------------------------------------------------------------------------
+        // 2) Test panic after flushing
+        // ------------------------------------------------------------------------------------------------------------
+
+        // Simulate we lose mastership AFTER flushing
+        doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
+
+        // User the real panicker
+        panicker = spy(new RuntimeExceptionPanicker());
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+        // Prepare test batch
+        batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        try {
+            persistenceHandler.onEvent(batchEvent);
+            fail();
+        } catch (RuntimeException re) {
+            // Expected
+        }
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(mockWriter, times(1)).flush();
+        verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
+        verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+        verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+    }
+
+}