You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/18 19:34:41 UTC

[6/7] incubator-omid git commit: Separate commit retry events from aborts in batch contents

Separate commit retry events from aborts in batch contents

Before, the Abort events were re-used for holding what in reality are
commit retries, what made the code criptic in many places. Now commit
retries are treated as first class events.

Change-Id: Ib9c9f424c3b7b94a02d5e985b388ef74f13c87da


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/8954a342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/8954a342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/8954a342

Branch: refs/heads/master
Commit: 8954a3421803b70aa25de2d8ee574ca4f3ed05e4
Parents: 3659b96
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon May 16 15:07:59 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/omid/tso/Batch.java    | 13 +++++-
 .../java/org/apache/omid/tso/PersistEvent.java  | 30 +++++--------
 .../apache/omid/tso/PersistenceProcessor.java   |  5 ++-
 .../omid/tso/PersistenceProcessorHandler.java   |  3 +-
 .../omid/tso/PersistenceProcessorImpl.java      | 12 ++++-
 .../org/apache/omid/tso/ReplyProcessorImpl.java | 12 +++--
 .../apache/omid/tso/RequestProcessorImpl.java   | 10 ++++-
 .../java/org/apache/omid/tso/TestBatch.java     | 16 ++++---
 .../tso/TestPersistenceProcessorHandler.java    | 46 ++++++++------------
 .../apache/omid/tso/TestRequestProcessor.java   |  6 +--
 10 files changed, 84 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 06ddc71..496bca2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -101,11 +101,20 @@ public class Batch {
 
     }
 
-    void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
+    public void addCommitRetry(long startTimestamp, Channel c, MonitoringContext context) {
+
+        Preconditions.checkState(!isFull(), "batch is full");
+        int index = numEvents++;
+        PersistEvent e = events[index];
+        e.makeCommitRetry(startTimestamp, c, context);
+
+    }
+
+    void addAbort(long startTimestamp, Channel c, MonitoringContext context) {
         Preconditions.checkState(!isFull(), "batch is full");
         int index = numEvents++;
         PersistEvent e = events[index];
-        e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
+        e.makePersistAbort(startTimestamp, c, context);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 9bf9e89..db58677 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,16 +25,14 @@ public final class PersistEvent {
     private MonitoringContext monCtx;
 
     enum Type {
-        TIMESTAMP, COMMIT, ABORT
+        TIMESTAMP, COMMIT, ABORT, COMMIT_RETRY
     }
 
     private Type type = null;
     private Channel channel = null;
 
-    private boolean isCommitRetry = false;
     private long startTimestamp = 0L;
     private long commitTimestamp = 0L;
-    private long lowWatermark = 0L;
 
     void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
 
@@ -46,11 +44,19 @@ public final class PersistEvent {
 
     }
 
-    void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
+    void makeCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+
+        this.type = Type.COMMIT_RETRY;
+        this.startTimestamp = startTimestamp;
+        this.channel = c;
+        this.monCtx = monCtx;
+
+    }
+
+    void makePersistAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
 
         this.type = Type.ABORT;
         this.startTimestamp = startTimestamp;
-        this.isCommitRetry = isCommitRetry;
         this.channel = c;
         this.monCtx = monCtx;
 
@@ -83,12 +89,6 @@ public final class PersistEvent {
 
     }
 
-    boolean isCommitRetry() {
-
-        return isCommitRetry;
-
-    }
-
     long getStartTimestamp() {
 
         return startTimestamp;
@@ -101,20 +101,12 @@ public final class PersistEvent {
 
     }
 
-    long getLowWatermark() {
-
-        return lowWatermark;
-
-    }
-
     @Override
     public String toString() {
         return Objects.toStringHelper(this)
                 .add("type", type)
-                .add("isCommitRetry", isCommitRetry)
                 .add("ST", startTimestamp)
                 .add("CT", commitTimestamp)
-                .add("LWM", lowWatermark)
                 .toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 0b1a34f..07893f6 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,12 +26,13 @@ interface PersistenceProcessor {
     void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
             throws Exception;
 
-    void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
+    void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+
+    void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 
     void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 
     void triggerCurrentBatchFlush() throws Exception;
 
     Future<Void> persistLowWatermark(long lowWatermark);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 079c8c4..bfe0036 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
 import static com.codahale.metrics.MetricRegistry.name;
+import static org.apache.omid.tso.PersistEvent.Type.*;
 
 public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
 
@@ -132,7 +133,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
         int currentEventIdx = 0;
         while (currentEventIdx <= batch.getLastEventIdx()) {
             PersistEvent event = batch.get(currentEventIdx);
-            if (event.isCommitRetry()) {
+            if (event.getType() == COMMIT_RETRY) {
                 retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                 // Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
                 swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 0da0c91..4fe21d2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,18 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     }
 
     @Override
-    public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
+    public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        currentBatch.addCommitRetry(startTimestamp, c, monCtx);
+        if (currentBatch.isFull()) {
+            triggerCurrentBatchFlush();
+        }
+    }
+
+    @Override
+    public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext context)
             throws Exception {
 
-        currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
+        currentBatch.addAbort(startTimestamp, c, context);
         if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 91df214..a46c240 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.lmax.disruptor.BatchEventProcessor;
@@ -50,9 +51,11 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     private final RingBuffer<ReplyBatchEvent> replyRing;
 
-    private AtomicLong nextIDToHandle = new AtomicLong();
+    @VisibleForTesting
+    AtomicLong nextIDToHandle = new AtomicLong();
 
-    private PriorityQueue<ReplyBatchEvent> futureEvents;
+    @VisibleForTesting
+    PriorityQueue<ReplyBatchEvent> futureEvents;
 
     // Metrics
     private final Meter abortMeter;
@@ -90,7 +93,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
+    @VisibleForTesting
+    void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
 
         String name;
         Batch batch = replyBatchEvent.getBatch();
@@ -116,6 +120,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
                 sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
                 event.getMonCtx().timerStop(name);
                 break;
+            case COMMIT_RETRY:
+                throw new RuntimeException("COMMIT_RETRY events must be filtered before this step. Event: {}");
             }
             event.getMonCtx().publish();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 372733e..32f5b8c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -205,8 +205,14 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             }
             persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
 
-        } else { // add it to the aborted list
-            persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
+        } else {
+
+            if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
+                persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
+            } else {
+                persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
+            }
+
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c472606..448eeaa 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -70,12 +70,14 @@ public class TestBatch {
 
         // Test when filling the batch with different types of events, that becomes full
         for (int i = 0; i < BATCH_SIZE; i++) {
-            if (i % 3 == 0) {
+            if (i % 4 == 0) {
                 batch.addTimestamp(ANY_ST, channel, monCtx);
-            } else if (i % 3 == 1) {
+            } else if (i % 4 == 1) {
                 batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+            } else if (i % 4 == 2) {
+                batch.addCommitRetry(ANY_ST, channel, monCtx);
             } else {
-                batch.addAbort(ANY_ST, false, channel, monCtx);
+                batch.addAbort(ANY_ST, channel, monCtx);
             }
         }
         assertFalse(batch.isEmpty(), "Batch should contain elements");
@@ -95,7 +97,8 @@ public class TestBatch {
         // Check the first 3 events and the last one correspond to the filling done above
         assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
         assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
-        assertTrue(batch.get(2).getType().equals(PersistEvent.Type.ABORT));
+        assertTrue(batch.get(2).getType().equals(PersistEvent.Type.COMMIT_RETRY));
+        assertTrue(batch.get(3).getType().equals(PersistEvent.Type.ABORT));
 
         // Set a new value for last element in Batch and check we obtain the right result
         batch.decreaseNumEvents();
@@ -137,10 +140,11 @@ public class TestBatch {
         // Put some elements in the batch...
         batch.addTimestamp(ANY_ST, channel, monCtx);
         batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
-        batch.addAbort(ANY_ST, false, channel, monCtx);
+        batch.addCommitRetry(ANY_ST, channel, monCtx);
+        batch.addAbort(ANY_ST, channel, monCtx);
         assertFalse(batch.isEmpty(), "Batch should contain elements");
         assertFalse(batch.isFull(), "Batch should NOT be full");
-        assertEquals(batch.getNumEvents(), 3, "Num events should be 3");
+        assertEquals(batch.getNumEvents(), 4, "Num events should be 4");
 
         // ... and passivate the object through the factory. It should reset the state of the batch
         factory.passivateObject(pooledBatch);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 4dcc2b0..d60d019 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -185,11 +185,9 @@ public class TestPersistenceProcessorHandler {
     @Test(timeOut = 10_000)
     public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
 
-        final boolean IS_RETRY = false;
-
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
@@ -204,13 +202,11 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
-
-        final boolean IS_RETRY = true;
+    public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -226,14 +222,12 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
-
-        final boolean IS_RETRY = true;
+    public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -254,16 +248,14 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+    public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
         // ------------------------------------------------------------------------------------------------------------
-        // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+        // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry but swapped events
         // ------------------------------------------------------------------------------------------------------------
 
-        final boolean IS_RETRY = true;
-
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
         batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -285,14 +277,12 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
-
-        final boolean IS_RETRY = true;
+    public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -312,14 +302,12 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
-
-        final boolean IS_RETRY = false;
+    public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -347,11 +335,11 @@ public class TestPersistenceProcessorHandler {
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
 
         batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
         batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+        batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
         batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 5b17d1e..8f2b06e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -112,7 +112,7 @@ public class TestRequestProcessor {
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
         requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
 
         requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
@@ -136,7 +136,7 @@ public class TestRequestProcessor {
         requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
         requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
 
     }
 
@@ -159,7 +159,7 @@ public class TestRequestProcessor {
 
         // ...check that the transaction is aborted when trying to commit
         requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
 
     }