You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/18 19:34:41 UTC
[6/7] incubator-omid git commit: Separate commit retry events from
aborts in batch contents
Separate commit retry events from aborts in batch contents
Before, the Abort events were re-used for holding what in reality are
commit retries, what made the code criptic in many places. Now commit
retries are treated as first class events.
Change-Id: Ib9c9f424c3b7b94a02d5e985b388ef74f13c87da
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/8954a342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/8954a342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/8954a342
Branch: refs/heads/master
Commit: 8954a3421803b70aa25de2d8ee574ca4f3ed05e4
Parents: 3659b96
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon May 16 15:07:59 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/omid/tso/Batch.java | 13 +++++-
.../java/org/apache/omid/tso/PersistEvent.java | 30 +++++--------
.../apache/omid/tso/PersistenceProcessor.java | 5 ++-
.../omid/tso/PersistenceProcessorHandler.java | 3 +-
.../omid/tso/PersistenceProcessorImpl.java | 12 ++++-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 12 +++--
.../apache/omid/tso/RequestProcessorImpl.java | 10 ++++-
.../java/org/apache/omid/tso/TestBatch.java | 16 ++++---
.../tso/TestPersistenceProcessorHandler.java | 46 ++++++++------------
.../apache/omid/tso/TestRequestProcessor.java | 6 +--
10 files changed, 84 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 06ddc71..496bca2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -101,11 +101,20 @@ public class Batch {
}
- void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
+ public void addCommitRetry(long startTimestamp, Channel c, MonitoringContext context) {
+
+ Preconditions.checkState(!isFull(), "batch is full");
+ int index = numEvents++;
+ PersistEvent e = events[index];
+ e.makeCommitRetry(startTimestamp, c, context);
+
+ }
+
+ void addAbort(long startTimestamp, Channel c, MonitoringContext context) {
Preconditions.checkState(!isFull(), "batch is full");
int index = numEvents++;
PersistEvent e = events[index];
- e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
+ e.makePersistAbort(startTimestamp, c, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 9bf9e89..db58677 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,16 +25,14 @@ public final class PersistEvent {
private MonitoringContext monCtx;
enum Type {
- TIMESTAMP, COMMIT, ABORT
+ TIMESTAMP, COMMIT, ABORT, COMMIT_RETRY
}
private Type type = null;
private Channel channel = null;
- private boolean isCommitRetry = false;
private long startTimestamp = 0L;
private long commitTimestamp = 0L;
- private long lowWatermark = 0L;
void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
@@ -46,11 +44,19 @@ public final class PersistEvent {
}
- void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
+ void makeCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+
+ this.type = Type.COMMIT_RETRY;
+ this.startTimestamp = startTimestamp;
+ this.channel = c;
+ this.monCtx = monCtx;
+
+ }
+
+ void makePersistAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
this.type = Type.ABORT;
this.startTimestamp = startTimestamp;
- this.isCommitRetry = isCommitRetry;
this.channel = c;
this.monCtx = monCtx;
@@ -83,12 +89,6 @@ public final class PersistEvent {
}
- boolean isCommitRetry() {
-
- return isCommitRetry;
-
- }
-
long getStartTimestamp() {
return startTimestamp;
@@ -101,20 +101,12 @@ public final class PersistEvent {
}
- long getLowWatermark() {
-
- return lowWatermark;
-
- }
-
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("type", type)
- .add("isCommitRetry", isCommitRetry)
.add("ST", startTimestamp)
.add("CT", commitTimestamp)
- .add("LWM", lowWatermark)
.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 0b1a34f..07893f6 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,12 +26,13 @@ interface PersistenceProcessor {
void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
throws Exception;
- void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
+ void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+
+ void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
void triggerCurrentBatchFlush() throws Exception;
Future<Void> persistLowWatermark(long lowWatermark);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 079c8c4..bfe0036 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static com.codahale.metrics.MetricRegistry.name;
+import static org.apache.omid.tso.PersistEvent.Type.*;
public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
@@ -132,7 +133,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
int currentEventIdx = 0;
while (currentEventIdx <= batch.getLastEventIdx()) {
PersistEvent event = batch.get(currentEventIdx);
- if (event.isCommitRetry()) {
+ if (event.getType() == COMMIT_RETRY) {
retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
// Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 0da0c91..4fe21d2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,18 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
@Override
- public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
+ public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ currentBatch.addCommitRetry(startTimestamp, c, monCtx);
+ if (currentBatch.isFull()) {
+ triggerCurrentBatchFlush();
+ }
+ }
+
+ @Override
+ public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext context)
throws Exception {
- currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
+ currentBatch.addAbort(startTimestamp, c, context);
if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 91df214..a46c240 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.lmax.disruptor.BatchEventProcessor;
@@ -50,9 +51,11 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private final RingBuffer<ReplyBatchEvent> replyRing;
- private AtomicLong nextIDToHandle = new AtomicLong();
+ @VisibleForTesting
+ AtomicLong nextIDToHandle = new AtomicLong();
- private PriorityQueue<ReplyBatchEvent> futureEvents;
+ @VisibleForTesting
+ PriorityQueue<ReplyBatchEvent> futureEvents;
// Metrics
private final Meter abortMeter;
@@ -90,7 +93,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
+ @VisibleForTesting
+ void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
String name;
Batch batch = replyBatchEvent.getBatch();
@@ -116,6 +120,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
event.getMonCtx().timerStop(name);
break;
+ case COMMIT_RETRY:
+ throw new RuntimeException("COMMIT_RETRY events must be filtered before this step. Event: {}");
}
event.getMonCtx().publish();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 372733e..32f5b8c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -205,8 +205,14 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
- } else { // add it to the aborted list
- persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
+ } else {
+
+ if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
+ persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
+ } else {
+ persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c472606..448eeaa 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -70,12 +70,14 @@ public class TestBatch {
// Test when filling the batch with different types of events, that becomes full
for (int i = 0; i < BATCH_SIZE; i++) {
- if (i % 3 == 0) {
+ if (i % 4 == 0) {
batch.addTimestamp(ANY_ST, channel, monCtx);
- } else if (i % 3 == 1) {
+ } else if (i % 4 == 1) {
batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+ } else if (i % 4 == 2) {
+ batch.addCommitRetry(ANY_ST, channel, monCtx);
} else {
- batch.addAbort(ANY_ST, false, channel, monCtx);
+ batch.addAbort(ANY_ST, channel, monCtx);
}
}
assertFalse(batch.isEmpty(), "Batch should contain elements");
@@ -95,7 +97,8 @@ public class TestBatch {
// Check the first 3 events and the last one correspond to the filling done above
assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
- assertTrue(batch.get(2).getType().equals(PersistEvent.Type.ABORT));
+ assertTrue(batch.get(2).getType().equals(PersistEvent.Type.COMMIT_RETRY));
+ assertTrue(batch.get(3).getType().equals(PersistEvent.Type.ABORT));
// Set a new value for last element in Batch and check we obtain the right result
batch.decreaseNumEvents();
@@ -137,10 +140,11 @@ public class TestBatch {
// Put some elements in the batch...
batch.addTimestamp(ANY_ST, channel, monCtx);
batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
- batch.addAbort(ANY_ST, false, channel, monCtx);
+ batch.addCommitRetry(ANY_ST, channel, monCtx);
+ batch.addAbort(ANY_ST, channel, monCtx);
assertFalse(batch.isEmpty(), "Batch should contain elements");
assertFalse(batch.isFull(), "Batch should NOT be full");
- assertEquals(batch.getNumEvents(), 3, "Num events should be 3");
+ assertEquals(batch.getNumEvents(), 4, "Num events should be 4");
// ... and passivate the object through the factory. It should reset the state of the batch
factory.passivateObject(pooledBatch);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 4dcc2b0..d60d019 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -185,11 +185,9 @@ public class TestPersistenceProcessorHandler {
@Test(timeOut = 10_000)
public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
- final boolean IS_RETRY = false;
-
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
persistenceHandler.onEvent(batchEvent);
@@ -204,13 +202,11 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
-
- final boolean IS_RETRY = true;
+ public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -226,14 +222,12 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
-
- final boolean IS_RETRY = true;
+ public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -254,16 +248,14 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+ public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
// ------------------------------------------------------------------------------------------------------------
- // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+ // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry but swapped events
// ------------------------------------------------------------------------------------------------------------
- final boolean IS_RETRY = true;
-
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -285,14 +277,12 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
-
- final boolean IS_RETRY = true;
+ public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -312,14 +302,12 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
-
- final boolean IS_RETRY = false;
+ public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -347,11 +335,11 @@ public class TestPersistenceProcessorHandler {
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
- batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+ batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
- batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 5b17d1e..8f2b06e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -112,7 +112,7 @@ public class TestRequestProcessor {
List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
@@ -136,7 +136,7 @@ public class TestRequestProcessor {
requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
}
@@ -159,7 +159,7 @@ public class TestRequestProcessor {
// ...check that the transaction is aborted when trying to commit
requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
}