You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:40 UTC
[33/50] [abbrv] incubator-omid git commit: Simplify Batch(Pool)
initialization and fix tests
Simplify Batch(Pool) initialization and fix tests
Also improve naming of variables and methods that have now different semantics
Change-Id: I262b0f5ad2c2dc47a02f41f049c5075c8b6a357e
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/1d60f21d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/1d60f21d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/1d60f21d
Branch: refs/heads/master
Commit: 1d60f21df8d239412939f0692f398e1c01294568
Parents: 43261a3
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Tue May 3 11:16:17 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 4 09:50:57 2016 -0700
----------------------------------------------------------------------
.../apache/omid/transaction/TestTSOModule.java | 4 +-
.../TSOForHBaseCompactorTestModule.java | 2 +-
.../main/java/org/apache/omid/tso/Batch.java | 45 ++-
.../java/org/apache/omid/tso/BatchPool.java | 11 +-
.../java/org/apache/omid/tso/PersistEvent.java | 12 +
.../apache/omid/tso/PersistenceProcessor.java | 16 +-
.../omid/tso/PersistenceProcessorHandler.java | 10 +-
.../omid/tso/PersistenceProcessorImpl.java | 79 ++--
.../org/apache/omid/tso/ReplyProcessor.java | 23 +-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 10 +-
.../apache/omid/tso/RequestProcessorImpl.java | 16 +-
.../java/org/apache/omid/tso/TSOModule.java | 4 +-
.../org/apache/omid/tso/TSOServerConfig.java | 74 ++--
tso-server/src/main/resources/default-omid.yml | 14 +-
.../java/org/apache/omid/tso/TSOMockModule.java | 4 +-
.../java/org/apache/omid/tso/TestBatch.java | 22 +-
.../java/org/apache/omid/tso/TestPanicker.java | 14 +-
.../omid/tso/TestPersistenceProcessor.java | 382 +++++++++++--------
.../apache/omid/tso/TestRequestProcessor.java | 32 +-
.../org/apache/omid/tso/TestRetryProcessor.java | 41 +-
...stTSOClientRequestAndResponseBehaviours.java | 1 +
tso-server/src/test/resources/test-omid.yml | 2 +-
22 files changed, 436 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 5d84b8b..79b84de 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.transaction;
+import com.google.common.base.Preconditions;
import com.google.inject.AbstractModule;
import com.google.inject.Provider;
import com.google.inject.Provides;
@@ -55,6 +56,7 @@ class TestTSOModule extends AbstractModule {
private final TSOServerConfig config;
TestTSOModule(Configuration hBaseConfig, TSOServerConfig config) {
+ Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
this.hBaseConfig = hBaseConfig;
this.config = config;
}
@@ -104,7 +106,7 @@ class TestTSOModule extends AbstractModule {
@Provides
PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
- PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+ PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
persistenceProcessorHandlers[i] = provider.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index 96afbd2..7c7e643 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -123,7 +123,7 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
@Provides
PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
- PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+ PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
persistenceProcessorHandlers[i] = provider.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index e778eee..2b17f23 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -17,32 +17,29 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.omid.tso.PersistEvent.Type;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+
public class Batch {
private static final Logger LOG = LoggerFactory.getLogger(Batch.class);
- private final PersistEvent[] events;
- private final int maxBatchSize;
- private final BatchPool batchPool;
private final int id;
+ private final int size;
private int numEvents;
-
- Batch(int maxBatchSize) {
-
- this(maxBatchSize, 0, null);
-
- }
+ private final PersistEvent[] events;
+ private final BatchPool batchPool;
Batch(int size, int id, BatchPool batchPool) {
+
Preconditions.checkArgument(size > 0, "Size must be positive");
- LOG.info("Batch id {} created with size {}", id, size);
- this.maxBatchSize = size;
+ this.size = size;
this.batchPool = batchPool;
this.id = id;
this.numEvents = 0;
@@ -50,12 +47,14 @@ public class Batch {
for (int i = 0; i < size; i++) {
this.events[i] = new PersistEvent();
}
+ LOG.info("Batch id {} created with size {}", id, size);
}
boolean isFull() {
- Preconditions.checkState(numEvents <= maxBatchSize, "numEvents > maxBatchSize");
- return numEvents == maxBatchSize;
+
+ Preconditions.checkState(numEvents <= size, "numEvents > size");
+ return numEvents == size;
}
@@ -66,8 +65,9 @@ public class Batch {
}
boolean isLastEntryEmpty() {
- Preconditions.checkState(numEvents <= maxBatchSize, "numEvents > maxBatchSize");
- return numEvents == (maxBatchSize - 1);
+
+ Preconditions.checkState(numEvents <= size, "numEvents > size");
+ return numEvents == (size - 1);
}
@@ -124,6 +124,7 @@ public class Batch {
}
void sendReply(ReplyProcessor reply, RetryProcessor retryProc, long batchID) {
+
int i = 0;
while (i < numEvents) {
PersistEvent e = events[i];
@@ -135,7 +136,7 @@ public class Batch {
events[numEvents - 1] = tmp;
if (numEvents == 1) {
clear();
- reply.batchResponse(null, batchID);
+ reply.manageResponsesBatch(batchID, null);
return;
}
numEvents--;
@@ -144,8 +145,18 @@ public class Batch {
i++;
}
- reply.batchResponse(this, batchID);
+ reply.manageResponsesBatch(batchID, this);
+
+ }
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("id", id)
+ .add("size", size)
+ .add("num events", numEvents)
+ .add("events", Arrays.toString(events))
+ .toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java b/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
index 88cfd1e..a5d7759 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/BatchPool.java
@@ -34,13 +34,12 @@ public class BatchPool {
@Inject
public BatchPool(TSOServerConfig config) {
- int numBuffersPerHandler = (config.getNumBuffersPerHandler() >= 2) ? config.getNumBuffersPerHandler() : 2;
- poolSize = config.getPersistHandlerNum() * numBuffersPerHandler;
- LOG.info("Config param maxBatchSize {}", config.getMaxBatchSize());
- LOG.info("Persistent Handlers {} ; Buffers per handler {}", config.getPersistHandlerNum(), numBuffersPerHandler);
+ poolSize = config.getNumConcurrentCTWriters();
+ int batchSize = config.getBatchSizePerCTWriter() + 1; // Add 1 element to batch size for storing LWM
+
+ LOG.info("Pool Size (Batches) {}; Batch Size {} (including LWM bucket)", poolSize, batchSize);
+ LOG.info("Total Batch Size (Pool size * Batch Size): {}", poolSize * batchSize);
batches = new Batch[poolSize];
- // TODO The + 1 is because the low watermark is added in each flush and needs one extra position. Fix and clarify initialization
- int batchSize = (config.getMaxBatchSize() / config.getPersistHandlerNum() > 0) ? (config.getMaxBatchSize() / config.getPersistHandlerNum()) + 1 : 2;
LOG.info("Creating {} Batches with {} elements each", poolSize, batchSize);
for (int i = 0; i < poolSize; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index e816149..32820f6 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Objects;
import org.jboss.netty.channel.Channel;
public final class PersistEvent {
@@ -114,4 +115,15 @@ public final class PersistEvent {
}
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("type", type)
+ .add("isRetry", isRetry)
+ .add("ST", startTimestamp)
+ .add("CT", commitTimestamp)
+ .add("LWM", lowWatermark)
+ .toString();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 47dc56b..aa0e7df 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -19,24 +19,18 @@ package org.apache.omid.tso;
import org.jboss.netty.channel.Channel;
-// TODO Check the names of all methods as they do not persist anything anymore
interface PersistenceProcessor {
- // TODO maybe it should be called addCommit(...) or addCommitToBatch(...)
- void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+ void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
throws InterruptedException;
- // TODO maybe it should be called addAbort(...) or addAbortToBatch(...)
- void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx)
+ void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx)
throws InterruptedException;
- // TODO maybe it should be called addTimestamp(...) or addTimestampToBatch(...)
- void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws InterruptedException;
+ void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws InterruptedException;
- // TODO maybe it should be called addLowWatermark(...) or addLowWatermarkToBatch(...)
- void persistLowWatermark(long lowWatermark, MonitoringContext monCtx);
+ void addLowWatermarkToBatch(long lowWatermark, MonitoringContext monCtx);
- // TODO The name of this method is weird. Rename to "persist"
- void persistFlush() throws InterruptedException;
+ void triggerCurrentBatchFlush() throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 19b6b77..84890b9 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.Histogram;
@@ -93,14 +94,11 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
}
}
- flush(batch, event.getBatchID());
+ flush(batch, event.getBatchSequence());
}
- // TODO Fix this method with the contents of PersistenceProcessor.flush() in master branch
- // TODO This is related to the changes in TestPersistenceProcessor.testCommitPersistenceWithHALeaseManager().
- // TODO Check also that test in the master branch
- private void flush(Batch batch, long batchID) {
+ private void flush(Batch batch, long batchSequence) {
if (batch.getNumEvents() > 0) {
commitSuicideIfNotMaster();
@@ -113,7 +111,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
panicker.panic("Error persisting commit batch", e);
}
commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
- batch.sendReply(replyProcessor, retryProc, batchID);
+ batch.sendReply(replyProcessor, retryProc, batchSequence);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index e57cda3..4d1eb4c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -18,6 +18,7 @@
package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
@@ -33,21 +34,23 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
+import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
+
class PersistenceProcessorImpl implements PersistenceProcessor {
private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
private static final long INITIAL_LWM_VALUE = -1L;
- private final ReplyProcessor replyProcessor;
private final RingBuffer<PersistBatchEvent> persistRing;
private final BatchPool batchPool;
@VisibleForTesting
- Batch batch;
+ Batch currentBatch;
// TODO Next two need to be either int or AtomicLong
- volatile private long batchIDCnt;
+ volatile private long batchSequence;
volatile private long lowWatermark = INITIAL_LWM_VALUE;
private MonitoringContext lowWatermarkContext;
@@ -55,82 +58,80 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
@Inject
PersistenceProcessorImpl(TSOServerConfig config,
BatchPool batchPool,
- ReplyProcessor replyProcessor,
Panicker panicker,
PersistenceProcessorHandler[] handlers)
throws InterruptedException, ExecutionException, IOException {
- this.batchIDCnt = 0L;
+ this.batchSequence = 0L;
this.batchPool = batchPool;
- this.batch = batchPool.getNextEmptyBatch();
+ this.currentBatch = batchPool.getNextEmptyBatch();
- this.replyProcessor = replyProcessor;
+ // Disruptor configuration
+ this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
- this.persistRing = RingBuffer.createSingleProducer(
- PersistBatchEvent.EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
+ ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
+ ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(),
+ threadFactory.build());
WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
persistRing.newBarrier(),
new FatalExceptionHandler(panicker),
handlers);
this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
-
- ExecutorService requestExec = Executors.newFixedThreadPool(config.getPersistHandlerNum(),
- new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
persistProcessor.start(requestExec);
}
@Override
- public void persistFlush() throws InterruptedException {
+ public void triggerCurrentBatchFlush() throws InterruptedException {
- if (batch.isEmpty()) {
+ if (currentBatch.isEmpty()) {
return;
}
- batch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
+ currentBatch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
long seq = persistRing.next();
PersistBatchEvent e = persistRing.get(seq);
- PersistBatchEvent.makePersistBatch(e, batch, batchIDCnt++);
+ makePersistBatch(e, batchSequence++, currentBatch);
persistRing.publish(seq);
- batch = batchPool.getNextEmptyBatch();
+ currentBatch = batchPool.getNextEmptyBatch();
}
@Override
- public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+ public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
throws InterruptedException {
- batch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
- if (batch.isLastEntryEmpty()) {
- persistFlush();
+ currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
+ if (currentBatch.isLastEntryEmpty()) {
+ triggerCurrentBatchFlush();
}
}
@Override
- public void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
+ public void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
throws InterruptedException {
- batch.addAbort(startTimestamp, isRetry, c, context);
- if (batch.isLastEntryEmpty()) {
- persistFlush();
+ currentBatch.addAbort(startTimestamp, isRetry, c, context);
+ if (currentBatch.isLastEntryEmpty()) {
+ triggerCurrentBatchFlush();
}
}
@Override
- public void persistTimestamp(long startTimestamp, Channel c, MonitoringContext context)
+ public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context)
throws InterruptedException {
- batch.addTimestamp(startTimestamp, c, context);
- if (batch.isLastEntryEmpty()) {
- persistFlush();
+ currentBatch.addTimestamp(startTimestamp, c, context);
+ if (currentBatch.isLastEntryEmpty()) {
+ triggerCurrentBatchFlush();
}
}
@Override
- public void persistLowWatermark(long lowWatermark, MonitoringContext context) {
+ public void addLowWatermarkToBatch(long lowWatermark, MonitoringContext context) {
this.lowWatermark = lowWatermark;
this.lowWatermarkContext = context;
@@ -139,20 +140,20 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
final static class PersistBatchEvent {
+ private long batchSequence;
private Batch batch;
- private long batchID;
- static void makePersistBatch(PersistBatchEvent e, Batch batch, long batchID) {
+ static void makePersistBatch(PersistBatchEvent e, long batchSequence, Batch batch) {
e.batch = batch;
- e.batchID = batchID;
+ e.batchSequence = batchSequence;
}
Batch getBatch() {
return batch;
}
- long getBatchID() {
- return batchID;
+ long getBatchSequence() {
+ return batchSequence;
}
final static EventFactory<PersistBatchEvent> EVENT_FACTORY = new EventFactory<PersistBatchEvent>() {
@@ -161,6 +162,14 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
};
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("batchSequence", batchSequence)
+ .add("batch", batch)
+ .toString();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 37a9dd9..04f40cc 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -21,27 +21,20 @@ import org.jboss.netty.channel.Channel;
interface ReplyProcessor {
- // TODO This documentation does not corresponds to the method below anymore. Put in the right place or remove
/**
- * Informs the client about the outcome of the Tx it was trying to commit.
+ * The each reply to a transactional operation for a client is contained in a batch. The batch must be ordered
+ * before sending the replies in order to not to break snapshot isolation properties.
*
+ * @param batchSequence
+ * a batch sequence number, used to enforce order between replies
* @param batch
- * the batch of operations
- * @param batchID
- * the id of the batch, used to enforce order between replies
- * @param makeHeuristicDecision
- * informs about whether heuristic actions are needed or not
- * @param startTimestamp
- * the start timestamp of the transaction (a.k.a. tx id)
- * @param commitTimestamp
- * the commit timestamp of the transaction
- * @param channel
- * the communication channed with the client
+ * a batch containing the transaction operations
*/
- void batchResponse(Batch batch, long batchID);
+ void manageResponsesBatch(long batchSequence, Batch batch);
+ // TODO This method can be removed if we return the responses from the retry processor
void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context);
-
+ // TODO This method can be removed if we return the responses from the retry processor
void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 85f465c..0f5fcb4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -165,28 +165,30 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
@Override
- public void batchResponse(Batch batch, long batchID) {
+ public void manageResponsesBatch(long batchSequence, Batch batch) {
long seq = replyRing.next();
ReplyBatchEvent e = replyRing.get(seq);
- ReplyBatchEvent.makeReplyBatch(e, batch, batchID);
+ ReplyBatchEvent.makeReplyBatch(e, batch, batchSequence);
replyRing.publish(seq);
}
+ // TODO This method can be removed if we return the responses from the retry processor
@Override
public void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context) {
batch.addAbort(startTimestamp, true, c, context);
- batchResponse(batch, NO_ORDER);
+ manageResponsesBatch(NO_ORDER, batch);
}
+ // TODO This method can be removed if we return the responses from the retry processor
@Override
public void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
batch.addCommit(startTimestamp, commitTimestamp, c, context);
- batchResponse(batch, NO_ORDER);
+ manageResponsesBatch(NO_ORDER, batch);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 2bbf1cf..ba0cb74 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -91,8 +91,8 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
public void update(TSOState state) throws InterruptedException {
LOG.info("Initializing RequestProcessor...");
this.lowWatermark = state.getLowWatermark();
- persistProc.persistLowWatermark(lowWatermark, new MonitoringContext(metrics));
- persistProc.persistFlush();
+ persistProc.addLowWatermarkToBatch(lowWatermark, new MonitoringContext(metrics));
+ persistProc.triggerCurrentBatchFlush();
LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
}
@@ -117,7 +117,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
if (endOfBatch) {
- persistProc.persistFlush();
+ persistProc.triggerCurrentBatchFlush();
}
}
@@ -125,7 +125,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
@Override
public void onTimeout(long sequence) throws Exception {
- persistProc.persistFlush();
+ persistProc.triggerCurrentBatchFlush();
}
@@ -161,7 +161,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
return;
}
- persistProc.persistTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
+ persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
}
@@ -208,15 +208,15 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
if (newLowWatermark != lowWatermark) {
LOG.trace("Setting new low Watermark to {}", newLowWatermark);
lowWatermark = newLowWatermark;
- persistProc.persistLowWatermark(newLowWatermark, event.getMonCtx());
+ persistProc.addLowWatermarkToBatch(newLowWatermark, event.getMonCtx());
}
}
- persistProc.persistCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
+ persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
} catch (IOException e) {
LOG.error("Error committing", e);
}
} else { // add it to the aborted list
- persistProc.persistAbort(startTimestamp, isRetry, c, event.getMonCtx());
+ persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
}
return commitTimestamp;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index f195ebb..bfa88ef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Preconditions;
import com.google.inject.AbstractModule;
import com.google.inject.Provider;
import com.google.inject.Provides;
@@ -34,6 +35,7 @@ class TSOModule extends AbstractModule {
private final TSOServerConfig config;
TSOModule(TSOServerConfig config) {
+ Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
this.config = config;
}
@@ -65,7 +67,7 @@ class TSOModule extends AbstractModule {
@Provides
PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
- PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+ PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
persistenceProcessorHandlers[i] = provider.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index c1fb5d2..05cb3f4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -72,15 +72,11 @@ public class TSOServerConfig extends SecureHBaseConfig {
private int maxItems;
- private int maxBatchSize;
+ private int numConcurrentCTWriters;
- private int batchPersistTimeoutInMs;
-
- // TODO Rename this (e.g. numOfConcurrentWriters)
- private int persistHandlerNum;
+ private int batchSizePerCTWriter;
- // TODO Rename this
- private int numBuffersPerHandler;
+ private int batchPersistTimeoutInMs;
private String networkIfaceName = getDefaultNetworkInterface();
@@ -92,8 +88,32 @@ public class TSOServerConfig extends SecureHBaseConfig {
this.port = port;
}
- public void setMaxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
+ public int getMaxItems() {
+ return maxItems;
+ }
+
+ public void setMaxItems(int maxItems) {
+ this.maxItems = maxItems;
+ }
+
+ public int getNumConcurrentCTWriters() {
+ return numConcurrentCTWriters;
+ }
+
+ public void setNumConcurrentCTWriters(int numConcurrentCTWriters) {
+ this.numConcurrentCTWriters = numConcurrentCTWriters;
+ }
+
+ public int getBatchSizePerCTWriter() {
+ return batchSizePerCTWriter;
+ }
+
+ public void setBatchSizePerCTWriter(int batchSizePerCTWriter) {
+ this.batchSizePerCTWriter = batchSizePerCTWriter;
+ }
+
+ public int getBatchPersistTimeoutInMs() {
+ return batchPersistTimeoutInMs;
}
public void setBatchPersistTimeoutInMs(int value) {
@@ -132,22 +152,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
this.leaseModule = leaseModule;
}
- public int getMaxItems() {
- return maxItems;
- }
-
- public void setMaxItems(int maxItems) {
- this.maxItems = maxItems;
- }
-
- public int getMaxBatchSize() {
- return maxBatchSize;
- }
-
- public int getBatchPersistTimeoutInMs() {
- return batchPersistTimeoutInMs;
- }
-
public MetricsRegistry getMetrics() {
return metrics;
}
@@ -156,26 +160,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
this.metrics = metrics;
}
- // TODO Rename this (e.g. numOfConcurrentWriters)
- public int getPersistHandlerNum() {
- return persistHandlerNum;
- }
-
- // TODO Rename this
- public void setPersistHandlerNum(int persistHandlerNum) {
- this.persistHandlerNum = persistHandlerNum;
- }
-
- // TODO Rename this
- public int getNumBuffersPerHandler() {
- return numBuffersPerHandler;
- }
-
- // TODO Rename this
- public void setNumBuffersPerHandler(int numBuffersPerHandler) {
- this.numBuffersPerHandler = numBuffersPerHandler;
- }
-
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/main/resources/default-omid.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid.yml b/tso-server/src/main/resources/default-omid.yml
index e3e559d..552a77a 100644
--- a/tso-server/src/main/resources/default-omid.yml
+++ b/tso-server/src/main/resources/default-omid.yml
@@ -5,7 +5,6 @@
# ---------------------------------------------------------------------------------------------------------------------
# Basic configuration parameters
# ---------------------------------------------------------------------------------------------------------------------
-# TODO Explain rest of the parameters and/or rename properly
# Network interface for TSO server communication. Uncomment the following line to use a specific interface
# networkIfaceName: eth0
@@ -14,12 +13,16 @@
# Port reserved by the Status Oracle
port: 54758
+# The number of elements reserved in the conflict map to perform conflict resolution
+# TODO Rename to conflictMapSize
maxItems: 100000000
-maxBatchSize: 25
+# The number of Commit Table writers that persist data concurrently to the datastore. It has to be at least 2.
+numConcurrentCTWriters: 2
+# The size of the batch of operations that each Commit Table writes has. The maximum number of operations that can be
+# batched in the system at a certain point in time is: numConcurrentCTWriters * batchSizePerCTWriter
+batchSizePerCTWriter: 25
+# When this timeout expires, the contents of the batch are flushed to the datastore
batchPersistTimeoutInMs: 10
-# TODO rename the next two parameters properly (e.g. numOfConcurrentWriters...)
-persistHandlerNum: 1
-numBuffersPerHandler: 10
# Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
@@ -64,7 +67,6 @@ metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
# }
# ]
-
# Example of multiple reporter configuration (to CSV files and console)
#
# metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index c10df9d..92582f6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Preconditions;
import com.google.inject.AbstractModule;
import com.google.inject.Provider;
import com.google.inject.Provides;
@@ -40,6 +41,7 @@ public class TSOMockModule extends AbstractModule {
private final TSOServerConfig config;
public TSOMockModule(TSOServerConfig config) {
+ Preconditions.checkArgument(config.getNumConcurrentCTWriters() >= 2, "# of Commit Table writers must be >= 2");
this.config = config;
}
@@ -78,7 +80,7 @@ public class TSOMockModule extends AbstractModule {
@Provides
PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) {
- PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
+ PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < persistenceProcessorHandlers.length; i++) {
persistenceProcessorHandlers[i] = provider.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 08b38be..c003f34 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -55,21 +55,13 @@ public class TestBatch {
@BeforeMethod(alwaysRun = true, timeOut = 30_000)
public void initMocksAndComponents() {
MockitoAnnotations.initMocks(this);
- batch = new Batch(BATCH_SIZE);
+ batch = new Batch(BATCH_SIZE, 0, null);
}
- // TODO. Check this test with the contents of the master branch. See commented asserts below
+ // TODO. Refactor the batch and test it properly. See commented asserts below
@Test
public void testBatchFunctionality() {
- // Required mocks
- Channel channel = Mockito.mock(Channel.class);
- ReplyProcessor replyProcessor = Mockito.mock(ReplyProcessor.class);
- RetryProcessor retryProcessor = Mockito.mock(RetryProcessor.class);
-
- // The batch element to test
- Batch batch = new Batch(BATCH_SIZE);
-
// Test initial state is OK
assertFalse(batch.isFull(), "Batch shouldn't be full");
assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
@@ -119,7 +111,7 @@ public class TestBatch {
// assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
//=======
batch.sendReply(replyProcessor, retryProcessor, (-1));
- verify(replyProcessor, timeout(100).times(1)).batchResponse(batch, (-1));
+ verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
assertTrue(batch.isFull(), "Batch shouldn't be empty");
}
@@ -141,12 +133,10 @@ public class TestBatch {
}
}
- // Test that sending replies empties the batch also when the replica
- // is NOT master and calls the ambiguousCommitResponse() method on the
- // reply processor
+ // Test that sending replies empties the batch also when the replica is NOT master and calls the
+ // ambiguousCommitResponse() method on the reply processor
batch.sendReply(replyProcessor, retryProcessor, (-1));
- verify(replyProcessor, timeout(100).times(1))
- .batchResponse(batch, (-1));
+ verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
assertTrue(batch.isFull(), "Batch should be full");
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 0343716..e8dd8ad 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -119,8 +119,8 @@ public class TestPanicker {
TSOServerConfig config = new TSOServerConfig();
BatchPool batchPool = new BatchPool(config);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
- for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+ for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics,
"localhost:1234",
leaseManager,
@@ -132,11 +132,10 @@ public class TestPanicker {
PersistenceProcessor proc = new PersistenceProcessorImpl(config,
batchPool,
- mock(ReplyProcessor.class),
panicker,
handlers);
- proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+ proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
@@ -170,8 +169,8 @@ public class TestPanicker {
TSOServerConfig config = new TSOServerConfig();
BatchPool batchPool = new BatchPool(config);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
- for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+ for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics,
"localhost:1234",
mock(LeaseManager.class),
@@ -183,10 +182,9 @@ public class TestPanicker {
PersistenceProcessor proc = new PersistenceProcessorImpl(config,
batchPool,
- mock(ReplyProcessor.class),
panicker,
handlers);
- proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+ proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index bc0b0f0..8e18119 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -44,12 +44,15 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-// TODO Add timers
-// TODO Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
+// TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
public class TestPersistenceProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
+ private static final long ANY_LWM = 0L;
+ private static final int ANY_ST = 0;
+ private static final int ANY_CT = 1;
+
@Mock
private CommitTable.Writer mockWriter;
@Mock
@@ -57,8 +60,6 @@ public class TestPersistenceProcessor {
@Mock
private RetryProcessor retryProcessor;
@Mock
- private ReplyProcessor replyProcessor;
- @Mock
private Panicker panicker;
private MetricsRegistry metrics;
@@ -69,9 +70,6 @@ public class TestPersistenceProcessor {
MockitoAnnotations.initMocks(this);
- // Configure mock writer to flush successfully
-// doThrow(new IOException("Unable to write")).when(mockWriter).flush();
-
// Configure null metrics provider
metrics = new NullMetricsProvider();
@@ -96,21 +94,23 @@ public class TestPersistenceProcessor {
}
@Test
- public void testCommitPersistenceWithMultiHandlers() throws Exception {
+ public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
- final int MAX_BATCH_SIZE = 4;
- final int NUM_PERSIST_HANDLERS = 4;
+ final int NUM_CT_WRITERS = 1;
+ final int BATCH_SIZE_PER_CT_WRITER = 2;
// Init a non-HA lease manager
VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
mock(TSOStateManager.class)));
TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
- tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
+ tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
+ tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
- for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+ for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234",
leaseManager,
commitTable,
@@ -122,46 +122,39 @@ public class TestPersistenceProcessor {
// Component under test
BatchPool batchPool = spy(new BatchPool(tsoConfig));
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
- batchPool,
- replyProcessor,
- panicker,
- handlers);
-
- verify(batchPool, times(1)).getNextEmptyBatch();
-
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
- proc.persistCommit(3, 4, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
+ proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
- proc.persistCommit(5, 6, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
+ verify(batchPool, times(1)).getNextEmptyBatch(); // Called during initialization
- proc.persistCommit(7, 8, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
- verify(batchPool, times(5)).getNextEmptyBatch(); // 5 Times: 1 in initialization + 4 when flushing above
+ verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).getNextEmptyBatch(); // 3: 1 in init + 2 when flushing
}
- @Test
- public void testCommitPersistenceWithSingleHandlerInMultiHandlersEnvironment() throws Exception {
+ @Test(timeOut=10_000)
+ public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception {
- final int MAX_BATCH_SIZE = 16;
- final int NUM_PERSIST_HANDLERS = 4;
+ final int NUM_CT_WRITERS = 2;
+ final int BATCH_SIZE_PER_CT_WRITER = 2;
// Init a non-HA lease manager
VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
mock(TSOStateManager.class)));
TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
- tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
+ tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
+ tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
+
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
- for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+ for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics,
"localhost:1234",
leaseManager,
@@ -173,66 +166,66 @@ public class TestPersistenceProcessor {
// Component under test
BatchPool batchPool = spy(new BatchPool(tsoConfig));
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
- batchPool,
- replyProcessor,
- panicker,
- handlers);
-
- verify(batchPool, times(1)).getNextEmptyBatch();
-
- // Fill one Batch completely
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistCommit(3, 4, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistCommit(5, 6, mock(Channel.class), mock(MonitoringContext.class));
- verify(batchPool, times(1)).getNextEmptyBatch();
- proc.persistCommit(7, 8, mock(Channel.class), mock(MonitoringContext.class)); // Should be full here
- verify(batchPool, times(2)).getNextEmptyBatch();
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
- // Test empty flush does not trigger response in getting a new batch
- proc.persistFlush();
- verify(batchPool, times(2)).getNextEmptyBatch();
+ proc.addLowWatermarkToBatch(ANY_LWM, mock(MonitoringContext.class)); // Add a fake LWM
+
+ verify(batchPool, times(1)).getNextEmptyBatch(); // Called during initialization
- // Fill a second Batch completely
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistCommit(3, 4, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistCommit(5, 6, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistCommit(7, 8, mock(Channel.class), mock(MonitoringContext.class)); // Should be full here
+ // Fill 1st handler Batches completely
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+ verify(batchPool, times(2)).getNextEmptyBatch();
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
verify(batchPool, times(3)).getNextEmptyBatch();
- // Start filling a new batch and flush it immediately
- proc.persistCommit(9, 10, mock(Channel.class), mock(MonitoringContext.class));
+ // Test empty flush does not trigger response in getting a new currentBatch
+ proc.triggerCurrentBatchFlush();
verify(batchPool, times(3)).getNextEmptyBatch();
- proc.persistFlush();
- verify(batchPool, times(4)).getNextEmptyBatch();
+
+ // Fill 2nd handler Batches completely
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+ verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).getNextEmptyBatch();
+
+ // Start filling a new currentBatch and flush it immediately
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
+ verify(batchPool, times(5)).getNextEmptyBatch();
+ proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
+ verify(batchPool, times(6)).getNextEmptyBatch();
// Test empty flush does not trigger response
- proc.persistFlush();
- proc.persistFlush();
- proc.persistFlush();
- proc.persistFlush();
- proc.persistFlush();
- verify(batchPool, times(4)).getNextEmptyBatch();
+ proc.triggerCurrentBatchFlush();
+ proc.triggerCurrentBatchFlush();
+ proc.triggerCurrentBatchFlush();
+ proc.triggerCurrentBatchFlush();
+ proc.triggerCurrentBatchFlush();
+ verify(batchPool, times(6)).getNextEmptyBatch();
}
- @Test
+ @Test(timeOut=10_000)
public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
- final int MAX_BATCH_SIZE = 4;
- final int NUM_PERSIST_HANDLERS = 4;
+ final int NUM_CT_WRITERS = 1;
+ final int BATCH_SIZE_PER_CT_WRITER = 1;
TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
- tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
+ tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS);
+ tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER);
tsoConfig.setBatchPersistTimeoutInMs(100);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
// Init a non-HA lease manager
VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
mock(TSOStateManager.class)));
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
- for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+ for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics,
"localhost:1234",
leaseManager,
@@ -242,111 +235,160 @@ public class TestPersistenceProcessor {
panicker);
}
- // Component under test
BatchPool batchPool = spy(new BatchPool(tsoConfig));
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
- batchPool,
- replyProcessor,
- panicker,
- handlers);
+
+ // Component under test
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+ proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
// The non-ha lease manager always return true for
- // stillInLeasePeriod(), so verify the batch sends replies as master
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
+ // stillInLeasePeriod(), so verify the currentBatch sends replies as master
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.triggerCurrentBatchFlush();
verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).getNextEmptyBatch();
}
- @Test
- public void testCommitPersistenceWithHALeaseManagerMultiHandlers() throws Exception {
- final int MAX_BATCH_SIZE = 4;
- final int NUM_PERSIST_HANDLERS = 4;
+ @Test(timeOut=10_000)
+ public void testCommitPersistenceWithHALeaseManagerAndSingleCommitTableWriter() throws Exception {
+
+ final int NUM_PERSIST_HANDLERS = 1;
TSOServerConfig tsoConfig = new TSOServerConfig();
- tsoConfig.setMaxBatchSize(MAX_BATCH_SIZE);
- tsoConfig.setPersistHandlerNum(NUM_PERSIST_HANDLERS);
- tsoConfig.setBatchPersistTimeoutInMs(100);
+ tsoConfig.setNumConcurrentCTWriters(NUM_PERSIST_HANDLERS);
+
+ testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
+ testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
+ testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
+ testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
- testCommitPersistenceWithHALeaseManagerPerConfig(tsoConfig);
}
- @Test
- public void testCommitPersistenceWithHALeaseManager() throws Exception {
+ @Test(timeOut=10_000)
+ public void testCommitPersistenceWithHALeaseManagerAndMultipleCommitTableWriters() throws Exception {
+
+ final int NUM_CT_WRITERS = 4;
+ final int BATCH_SIZE_PER_CT_WRITER = 4;
TSOServerConfig tsoConfig = new TSOServerConfig();
- testCommitPersistenceWithHALeaseManagerPerConfig(tsoConfig);
+ tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS);
+ tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER);
+ tsoConfig.setBatchPersistTimeoutInMs(100);
+
+ testPersistenceWithHALeaseManagerPreservingLease(tsoConfig);
+ testPersistenceWithHALeaseManagerFailingToPreserveLease1(tsoConfig);
+ testPersistenceWithHALeaseManagerFailingToPreserveLease2(tsoConfig);
+ testPersistenceWithHALeaseManagerFailingToPreserveLease3(tsoConfig);
}
- // TODO Recheck this tests comparing with previous master
- private void testCommitPersistenceWithHALeaseManagerPerConfig (TSOServerConfig tsoConfig) throws Exception {
+ private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception {
// Init a HA lease manager
- LeaseManager leaseManager = mock(LeaseManager.class);
+ LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getPersistHandlerNum()];
- for (int i = 0; i < tsoConfig.getPersistHandlerNum(); i++) {
- handlers[i] = new PersistenceProcessorHandler(metrics,
- "localhost:1234",
- leaseManager,
- commitTable,
- replyProcessor,
- retryProcessor,
- new RuntimeExceptionPanicker());
- }
+ PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
+
+ BatchPool batchPool = spy(new BatchPool(tsoConfig));
+
+ // Component under test
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+ proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
+
+ // Test: Configure the lease manager to return true always
+ doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.triggerCurrentBatchFlush();
+ verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+ verify(batchPool, times(2)).getNextEmptyBatch();
+ }
+
+ private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception {
+
+ // Init a HA lease manager
+ LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
+
+ PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
+
+ BatchPool batchPool = spy(new BatchPool(tsoConfig));
// Component under test
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+ // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
+ doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
+ batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.triggerCurrentBatchFlush();
+ verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+ verify(batchPool, times(2)).getNextEmptyBatch();
+ }
+
+ private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception {
+
+ // Init a HA lease manager
+ LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
+
+ PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
+
BatchPool batchPool = spy(new BatchPool(tsoConfig));
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig,
- batchPool,
- replyProcessor,
- panicker,
- handlers);
- doReturn(true).when(leaseManager).stillInLeasePeriod();
- MonitoringContext monCtx = new MonitoringContext(metrics);
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
- verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+ // Component under test
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+
+ // Test: Configure the lease manager to return false for stillInLeasePeriod
+ doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
+ batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.triggerCurrentBatchFlush();
+ verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
verify(batchPool, times(2)).getNextEmptyBatch();
+ }
- // Test 2: Configure the lease manager to return true first and false later for stillInLeasePeriod
+ private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception {
- // Reset stuff
- reset(leaseManager);
- doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
- verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
- verify(batchPool, times(3)).getNextEmptyBatch();
+ // Init a HA lease manager
+ LeaseManager simulatedHALeaseManager = mock(LeaseManager.class);
- // Test 3: Configure the lease manager to return false for stillInLeasePeriod
+ PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager);
- // Reset stuff
- reset(leaseManager);
- doReturn(false).when(leaseManager).stillInLeasePeriod();
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
- verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
- verify(batchPool, times(4)).getNextEmptyBatch();
+ BatchPool batchPool = spy(new BatchPool(tsoConfig));
- // Test 4: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
- // an exception when flush
+ // Component under test
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
- // Reset stuff
- reset(leaseManager);
- // Configure mock writer to flush successfully
+ // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
+ // an exception when flush
+ // Configure mock writer to flush unsuccessfully
doThrow(new IOException("Unable to write")).when(mockWriter).flush();
- doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
- proc.persistCommit(1, 2, mock(Channel.class), mock(MonitoringContext.class));
- proc.persistFlush();
- verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
- verify(batchPool, times(5)).getNextEmptyBatch();
+ doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
+ batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.triggerCurrentBatchFlush();
+ verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
+ verify(batchPool, times(2)).getNextEmptyBatch();
+
}
- @Test
+ private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig, LeaseManager leaseManager)
+ throws Exception {
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+ for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
+ handlers[i] = new PersistenceProcessorHandler(metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ new ReplyProcessorImpl(metrics, panicker),
+ retryProcessor,
+ new RuntimeExceptionPanicker());
+ }
+ return handlers;
+ }
+
+ @Test(timeOut=10_000)
public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
// Init lease management (doesn't matter if HA or not)
@@ -354,22 +396,20 @@ public class TestPersistenceProcessor {
TSOServerConfig config = new TSOServerConfig();
BatchPool batchPool = new BatchPool(config);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
- for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+ for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics,
"localhost:1234",
leaseManager,
commitTable,
- mock(ReplyProcessor.class),
+ replyProcessor,
mock(RetryProcessor.class),
panicker);
}
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config,
- batchPool,
- mock(ReplyProcessor.class),
- panicker,
- handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
MonitoringContext monCtx = new MonitoringContext(metrics);
@@ -379,20 +419,25 @@ public class TestPersistenceProcessor {
// Configure commit table writer to explode when flushing changes to DB
doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
+ proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
+
// Check the panic is extended!
- proc.persistCommit(1, 2, null, monCtx);
- proc.persistFlush();
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+ proc.triggerCurrentBatchFlush();
verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+
}
- @Test
+ @Test(timeOut=10_000)
public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
TSOServerConfig config = new TSOServerConfig();
BatchPool batchPool = new BatchPool(config);
- PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getPersistHandlerNum()];
- for (int i = 0; i < config.getPersistHandlerNum(); i++) {
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker);
+
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
+ for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
handlers[i] = new PersistenceProcessorHandler(metrics,
"localhost:1234",
mock(LeaseManager.class),
@@ -402,19 +447,18 @@ public class TestPersistenceProcessor {
panicker);
}
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config,
- batchPool,
- replyProcessor,
- panicker,
- handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
// Configure writer to explode with a runtime exception
doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
MonitoringContext monCtx = new MonitoringContext(metrics);
+ proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake LWM
+
+ batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
// Check the panic is extended!
- proc.persistCommit(1, 2, null, monCtx);
- proc.persistFlush();
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+ proc.triggerCurrentBatchFlush();
verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 2c612a4..11f1a99 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -84,14 +84,14 @@ public class TestRequestProcessor {
requestProc.timestampRequest(null, new MonitoringContext(metrics));
ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).persistTimestamp(
+ verify(persist, timeout(100).times(1)).addTimestampToBatch(
firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
long firstTS = firstTScapture.getValue();
// verify that timestamps increase monotonically
for (int i = 0; i < 100; i++) {
requestProc.timestampRequest(null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).persistTimestamp(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
}
}
@@ -101,37 +101,37 @@ public class TestRequestProcessor {
requestProc.timestampRequest(null, new MonitoringContext(metrics));
ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).persistTimestamp(
+ verify(persist, timeout(100).times(1)).addTimestampToBatch(
TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
long firstTS = TScapture.getValue();
List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).persistAbort(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).persistCommit(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
// test conflict
requestProc.timestampRequest(null, new MonitoringContext(metrics));
TScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(2)).persistTimestamp(
+ verify(persist, timeout(100).times(2)).addTimestampToBatch(
TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
long secondTS = TScapture.getValue();
requestProc.timestampRequest(null, new MonitoringContext(metrics));
TScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(3)).persistTimestamp(
+ verify(persist, timeout(100).times(3)).addTimestampToBatch(
TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
long thirdTS = TScapture.getValue();
requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).persistCommit(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).persistAbort(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
}
@@ -143,9 +143,9 @@ public class TestRequestProcessor {
// Start a transaction...
requestProc.timestampRequest(null, new MonitoringContext(metrics));
ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).persistTimestamp(capturedTS.capture(),
- any(Channel.class),
- any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
+ any(Channel.class),
+ any(MonitoringContext.class));
long startTS = capturedTS.getValue();
// ... simulate the reset of the RequestProcessor state (e.g. due to
@@ -154,7 +154,7 @@ public class TestRequestProcessor {
// ...check that the transaction is aborted when trying to commit
requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).persistAbort(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
}
@@ -175,11 +175,11 @@ public class TestRequestProcessor {
Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
// Check that first time its called is on init
- verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(0L), any(MonitoringContext.class));
// Then, check it is called when cache is full and the first element is evicted (should be a 1)
- verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(FIRST_COMMIT_TS_EVICTED), any(MonitoringContext.class));
// Finally it should never be called with the next element
- verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED), any(MonitoringContext.class));
+ verify(persist, timeout(100).never()).addLowWatermarkToBatch(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED), any(MonitoringContext.class));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/1d60f21d/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index c5270c0..afa191e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -66,10 +66,7 @@ public class TestRetryProcessor {
}
@Test(timeOut = 10_000)
- public void testBasicFunctionality() throws Exception {
-
- commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
-
+ public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
BatchPool batchPool = new BatchPool(new TSOServerConfig());
// The element to test
@@ -77,22 +74,35 @@ public class TestRetryProcessor {
// Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
- ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
verify(replyProc, timeout(100).times(1))
- .addAbort(any(Batch.class), firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
- long startTS = firstTScapture.getValue();
+ .addAbort(any(Batch.class), firstTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ long startTS = firstTSCapture.getValue();
assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
+ }
+
+ @Test(timeOut = 10_000)
+ public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
+ BatchPool batchPool = new BatchPool(new TSOServerConfig());
+
+ // The element to test
+ RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
// Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
+ commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
- ArgumentCaptor<Long> secondTScapture = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
+ ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1))
- .addCommit(any(Batch.class), firstTScapture.capture(), secondTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProc, timeout(100).times(1)).addCommit(any(Batch.class),
+ firstTSCapture.capture(),
+ secondTSCapture.capture(),
+ any(Channel.class),
+ any(MonitoringContext.class));
- startTS = firstTScapture.getValue();
- long commitTS = secondTScapture.getValue();
+ long startTS = firstTSCapture.getValue();
+ long commitTS = secondTSCapture.getValue();
assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
}
@@ -117,9 +127,10 @@ public class TestRetryProcessor {
// Test we'll reply with an abort for a retry request when the
// transaction id IS in the commit table BUT invalidated
retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
- ArgumentCaptor<Long> startTScapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1)).addAbort(any(Batch.class), startTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
- long startTS = startTScapture.getValue();
+ ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
+ verify(replyProc, timeout(100).times(1))
+ .addAbort(any(Batch.class), startTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ long startTS = startTSCapture.getValue();
Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
}