You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:54 UTC
[47/50] [abbrv] incubator-omid git commit: Persist LWM directly
without batching it
Persist LWM directly without batching it
This simplifies the batch creation as we don't need a special bucket
for the low watermark anymore. It also simplifies the treatment of the
low watermark in the processors.
Change-Id: Ie86d92dcba5418647b3803c72cadf60cd2e5e6a9
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/5b27a074
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/5b27a074
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/5b27a074
Branch: refs/heads/master
Commit: 5b27a0740dbd2752872ebd516cdddb0ca73d5e4b
Parents: 1e74863
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Tue May 10 17:58:17 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Tue May 10 20:06:16 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/omid/tso/Batch.java | 17 +----
.../org/apache/omid/tso/BatchPoolModule.java | 4 +-
.../java/org/apache/omid/tso/PersistEvent.java | 10 +--
.../apache/omid/tso/PersistenceProcessor.java | 6 +-
.../omid/tso/PersistenceProcessorHandler.java | 3 -
.../omid/tso/PersistenceProcessorImpl.java | 56 ++++++++++----
.../org/apache/omid/tso/ReplyProcessorImpl.java | 3 -
.../apache/omid/tso/RequestProcessorImpl.java | 6 +-
.../java/org/apache/omid/tso/TestBatch.java | 18 +----
.../java/org/apache/omid/tso/TestPanicker.java | 8 +-
.../omid/tso/TestPersistenceProcessor.java | 79 ++++++++++++++------
.../apache/omid/tso/TestRequestProcessor.java | 13 +++-
...stTSOClientRequestAndResponseBehaviours.java | 24 ++----
13 files changed, 137 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index a36d2a7..c5ed696 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -64,13 +64,6 @@ public class Batch {
}
- boolean isLastEntryEmpty() {
-
- Preconditions.checkState(numEvents <= size, "numEvents > size");
- return numEvents == (size - 1);
-
- }
-
int getNumEvents() {
return numEvents;
}
@@ -82,7 +75,7 @@ public class Batch {
}
PersistEvent get(int idx) {
- Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents);
+ Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents);
return events[idx];
}
@@ -119,14 +112,6 @@ public class Batch {
}
- void addLowWatermark(long lowWatermark, MonitoringContext context) {
- Preconditions.checkState(!isFull(), "batch is full");
- int index = numEvents++;
- PersistEvent e = events[index];
- e.makePersistLowWatermark(lowWatermark, context);
-
- }
-
@Override
public String toString() {
return Objects.toStringHelper(this)
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java b/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
index 96e8717..c28f3aa 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
@@ -48,9 +48,9 @@ public class BatchPoolModule extends AbstractModule {
ObjectPool<Batch> getBatchPool() throws Exception {
int poolSize = config.getNumConcurrentCTWriters();
- int batchSize = config.getBatchSizePerCTWriter() + 1; // Add 1 element to batch size for storing LWM
+ int batchSize = config.getBatchSizePerCTWriter();
- LOG.info("Pool Size (# of Batches) {}; Batch Size {} (including LWM bucket)", poolSize, batchSize);
+ LOG.info("Pool Size (# of Batches) {}; Batch Size {}", poolSize, batchSize);
LOG.info("Total Batch Size (Pool size * Batch Size): {}", poolSize * batchSize);
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(poolSize);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 32820f6..8d4fd85 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,7 +25,7 @@ public final class PersistEvent {
private MonitoringContext monCtx;
enum Type {
- TIMESTAMP, COMMIT, ABORT, LOW_WATERMARK
+ TIMESTAMP, COMMIT, ABORT
}
private Type type = null;
@@ -65,14 +65,6 @@ public final class PersistEvent {
}
- void makePersistLowWatermark(long lowWatermark, MonitoringContext monCtx) {
-
- this.type = Type.LOW_WATERMARK;
- this.lowWatermark = lowWatermark;
- this.monCtx = monCtx;
-
- }
-
MonitoringContext getMonCtx() {
return monCtx;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 9467c13..f4e4a51 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -19,6 +19,8 @@ package org.apache.omid.tso;
import org.jboss.netty.channel.Channel;
+import java.util.concurrent.Future;
+
interface PersistenceProcessor {
void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
@@ -28,8 +30,8 @@ interface PersistenceProcessor {
void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
- void addLowWatermarkToBatch(long lowWatermark, MonitoringContext monCtx);
-
void triggerCurrentBatchFlush() throws Exception;
+ Future<Void> persistLowWatermark(long lowWatermark);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 7e84539..cf7557a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -86,9 +86,6 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
case TIMESTAMP:
localEvent.getMonCtx().timerStart("timestampPersistProcessor");
break;
- case LOW_WATERMARK:
- writer.updateLowWatermark(localEvent.getLowWatermark());
- break;
default:
throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index b78ad1a..761d30b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -25,14 +25,21 @@ import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkerPool;
import org.apache.commons.pool2.ObjectPool;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.Timer;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
+import java.io.IOException;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import static org.apache.omid.metrics.MetricsUtils.name;
import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
@@ -50,21 +57,32 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
// TODO Next two need to be either int or AtomicLong
volatile private long batchSequence;
- volatile private long lowWatermark = INITIAL_LWM_VALUE;
- private MonitoringContext lowWatermarkContext;
+ private CommitTable.Writer lowWatermarkWriter;
+ private ExecutorService lowWatermarkWriterExecutor;
+
+ private MetricsRegistry metrics;
+ private final Timer lwmWriteTimer;
@Inject
PersistenceProcessorImpl(TSOServerConfig config,
+ CommitTable commitTable,
ObjectPool<Batch> batchPool,
Panicker panicker,
- PersistenceProcessorHandler[] handlers)
+ PersistenceProcessorHandler[] handlers,
+ MetricsRegistry metrics)
throws Exception {
+ this.metrics = metrics;
+ this.lowWatermarkWriter = commitTable.getWriter();
this.batchSequence = 0L;
this.batchPool = batchPool;
this.currentBatch = batchPool.borrowObject();
+ // Low Watermark writer
+ ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
+ lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
+
// Disruptor configuration
this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
@@ -79,6 +97,9 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
persistProcessor.start(requestExec);
+ // Metrics config
+ this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
+
}
@Override
@@ -87,7 +108,6 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
if (currentBatch.isEmpty()) {
return;
}
- currentBatch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
long seq = persistRing.next();
PersistBatchEvent e = persistRing.get(seq);
makePersistBatch(e, batchSequence++, currentBatch);
@@ -101,7 +121,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
throws Exception {
currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
- if (currentBatch.isLastEntryEmpty()) {
+ if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
@@ -112,28 +132,38 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
throws Exception {
currentBatch.addAbort(startTimestamp, isRetry, c, context);
- if (currentBatch.isLastEntryEmpty()) {
+ if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
}
@Override
- public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context)
- throws Exception {
+ public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context) throws Exception {
currentBatch.addTimestamp(startTimestamp, c, context);
- if (currentBatch.isLastEntryEmpty()) {
+ if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
}
@Override
- public void addLowWatermarkToBatch(long lowWatermark, MonitoringContext context) {
-
- this.lowWatermark = lowWatermark;
- this.lowWatermarkContext = context;
+ public Future<Void> persistLowWatermark(final long lowWatermark) {
+
+ return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ try {
+ lwmWriteTimer.start();
+ lowWatermarkWriter.updateLowWatermark(lowWatermark);
+ lowWatermarkWriter.flush();
+ } finally {
+ lwmWriteTimer.stop();
+ }
+ return null;
+ }
+ });
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index a4e2ce1..c1709ef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -116,9 +116,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
localEvent.getMonCtx().timerStop(name);
break;
- // TODO Check if we still need this
- case LOW_WATERMARK:
- break;
default:
LOG.error("Unknown event {}", localEvent.getType());
break;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 12f8326..4e17850 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
@@ -91,8 +92,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
public void update(TSOState state) throws Exception {
LOG.info("Initializing RequestProcessor...");
this.lowWatermark = state.getLowWatermark();
- persistProc.addLowWatermarkToBatch(lowWatermark, new MonitoringContext(metrics));
- persistProc.triggerCurrentBatchFlush();
+ persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
}
@@ -210,7 +210,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
if (newLowWatermark != lowWatermark) {
LOG.trace("Setting new low Watermark to {}", newLowWatermark);
lowWatermark = newLowWatermark;
- persistProc.addLowWatermarkToBatch(newLowWatermark, event.getMonCtx());
+ persistProc.persistLowWatermark(newLowWatermark); // Async persist
}
}
persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 45fa639..c472606 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -38,9 +38,6 @@ public class TestBatch {
private static final long ANY_ST = 1231;
private static final long ANY_CT = 2241;
- private static final int DUMMY_LWM = 212;
- private static final int NEW_LWM = 123;
-
@Mock
private Channel channel;
@Mock
@@ -72,7 +69,7 @@ public class TestBatch {
}
// Test when filling the batch with different types of events, that becomes full
- for (int i = 0; i < (BATCH_SIZE - 1); i++) {
+ for (int i = 0; i < BATCH_SIZE; i++) {
if (i % 3 == 0) {
batch.addTimestamp(ANY_ST, channel, monCtx);
} else if (i % 3 == 1) {
@@ -81,8 +78,6 @@ public class TestBatch {
batch.addAbort(ANY_ST, false, channel, monCtx);
}
}
- assertTrue(batch.isLastEntryEmpty(), "Should be only one entry left in the batch");
- batch.addLowWatermark(DUMMY_LWM, monCtx); // Add DUMMY_LWM as last element
assertFalse(batch.isEmpty(), "Batch should contain elements");
assertTrue(batch.isFull(), "Batch should be full");
assertEquals(batch.getNumEvents(), BATCH_SIZE, "Num events should be " + BATCH_SIZE);
@@ -93,7 +88,7 @@ public class TestBatch {
fail("Should throw an IllegalStateException");
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "batch is full", "message returned doesn't match");
- LOG.debug("IllegalStateException catched properly");
+ LOG.debug("IllegalStateException catch properly");
}
assertTrue(batch.isFull(), "Batch shouldn't be empty");
@@ -101,24 +96,19 @@ public class TestBatch {
assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
assertTrue(batch.get(2).getType().equals(PersistEvent.Type.ABORT));
- assertTrue(batch.get(BATCH_SIZE - 1).getType().equals(PersistEvent.Type.LOW_WATERMARK));
// Set a new value for last element in Batch and check we obtain the right result
batch.decreaseNumEvents();
assertEquals(batch.getNumEvents(), BATCH_SIZE - 1, "Num events should be " + (BATCH_SIZE - 1));
- assertTrue(batch.isLastEntryEmpty(), "Should be only one entry left in the batch");
try {
batch.get(BATCH_SIZE - 1);
fail();
} catch (IllegalStateException ex) {
// Expected, as we can not access elements in the batch greater than the current number of events
}
- batch.addLowWatermark(NEW_LWM, monCtx); // Add new LWM as last element
- assertTrue(batch.get(BATCH_SIZE - 1).getType().equals(PersistEvent.Type.LOW_WATERMARK));
- assertEquals(batch.get(BATCH_SIZE - 1).getLowWatermark(), NEW_LWM);
- // Re-check that batch is full again
- assertTrue(batch.isFull(), "Batch shouldn't be empty");
+ // Re-check that batch is NOT full
+ assertFalse(batch.isFull(), "Batch shouldn't be full");
// Clear the batch and goes back to its initial state
batch.clear();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 716f64f..a5f4eed 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -132,9 +132,11 @@ public class TestPanicker {
}
PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+ commitTable,
batchPool,
panicker,
- handlers);
+ handlers,
+ metrics);
proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
@@ -182,9 +184,11 @@ public class TestPanicker {
}
PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+ commitTable,
batchPool,
panicker,
- handlers);
+ handlers,
+ metrics);
proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 91b9504..6e5c040 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -22,6 +22,7 @@ import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.jboss.netty.channel.Channel;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -43,13 +44,14 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
// TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
public class TestPersistenceProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
- private static final long ANY_LWM = 0L;
+ private static final long ANY_LWM = 1234L;
private static final int ANY_ST = 0;
private static final int ANY_CT = 1;
@@ -93,7 +95,41 @@ public class TestPersistenceProcessor {
Mockito.reset(mockWriter);
}
- @Test
+ @Test(timeOut = 10_000)
+ public void testLowWatermarkIsPersisted() throws Exception {
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+
+ PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+ for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
+ handlers[i] = new PersistenceProcessorHandler(metrics,
+ "localhost:1234",
+ mock(LeaseManager.class),
+ commitTable,
+ mock(ReplyProcessor.class),
+ retryProcessor,
+ panicker);
+ }
+
+ // Component under test
+ PersistenceProcessorImpl persistenceProcessor =
+ new PersistenceProcessorImpl(tsoConfig,
+ commitTable,
+ mock(ObjectPool.class),
+ panicker,
+ handlers,
+ metrics);
+
+ persistenceProcessor.persistLowWatermark(ANY_LWM).get();
+
+ ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
+ CommitTable.Writer lwmWriter = commitTable.getWriter();
+ verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
+ assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
+
+ }
+
+ @Test(timeOut = 10_000)
public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
final int NUM_CT_WRITERS = 1;
@@ -122,8 +158,8 @@ public class TestPersistenceProcessor {
}
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
- proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
verify(batchPool, times(1)).borrowObject(); // Called during initialization
@@ -166,9 +202,8 @@ public class TestPersistenceProcessor {
}
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-
- proc.addLowWatermarkToBatch(ANY_LWM, mock(MonitoringContext.class)); // Add a fake DUMMY_LWM
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
verify(batchPool, times(1)).borrowObject(); // Called during initialization
@@ -238,9 +273,8 @@ public class TestPersistenceProcessor {
}
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-
- proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
// The non-ha lease manager always return true for
// stillInLeasePeriod(), so verify the currentBatch sends replies as master
@@ -294,9 +328,8 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-
- proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
// Test: Configure the lease manager to return true always
doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
@@ -316,11 +349,11 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
// Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- //batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
@@ -337,7 +370,8 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
// Test: Configure the lease manager to return false for stillInLeasePeriod
doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
@@ -358,7 +392,8 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ panicker, handlers, metrics);
// Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
// an exception when flush
@@ -413,7 +448,8 @@ public class TestPersistenceProcessor {
panicker);
}
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+ panicker, handlers, metrics);
MonitoringContext monCtx = new MonitoringContext(metrics);
@@ -423,8 +459,6 @@ public class TestPersistenceProcessor {
// Configure commit table writer to explode when flushing changes to DB
doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
- proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
-
// Check the panic is extended!
proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
proc.triggerCurrentBatchFlush();
@@ -452,14 +486,13 @@ public class TestPersistenceProcessor {
panicker);
}
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+ panicker, handlers, metrics);
// Configure writer to explode with a runtime exception
doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
MonitoringContext monCtx = new MonitoringContext(metrics);
- proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
-
// Check the panic is extended!
proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
proc.triggerCurrentBatchFlush();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 11f1a99..5b17d1e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -18,6 +18,7 @@
package org.apache.omid.tso;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.jboss.netty.channel.Channel;
@@ -34,6 +35,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@@ -67,6 +69,9 @@ public class TestRequestProcessor {
stateManager = new TSOStateManagerImpl(timestampOracle);
persist = mock(PersistenceProcessor.class);
+ SettableFuture<Void> f = SettableFuture.create();
+ f.set(null);
+ doReturn(f).when(persist).persistLowWatermark(any(Long.class));
TSOServerConfig config = new TSOServerConfig();
config.setMaxItems(CONFLICT_MAP_SIZE);
@@ -165,7 +170,7 @@ public class TestRequestProcessor {
final long FIRST_COMMIT_TS_EVICTED = 1L;
final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
- // Fill the cache to provoke a cache eviction
+ // Fill the cache to provoke a cache eviction
for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
@@ -175,11 +180,11 @@ public class TestRequestProcessor {
Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
// Check that first time its called is on init
- verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(0L), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
// Then, check it is called when cache is full and the first element is evicted (should be a 1)
- verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(FIRST_COMMIT_TS_EVICTED), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
// Finally it should never be called with the next element
- verify(persist, timeout(100).never()).addLowWatermarkToBatch(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED), any(MonitoringContext.class));
+ verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index 203f46f..e41e656 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -69,8 +69,8 @@ public class TestTSOClientRequestAndResponseBehaviours {
private PausableTimestampOracle pausableTSOracle;
private CommitTable commitTable;
- @BeforeClass
- public void setup() throws Exception {
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setMaxItems(1000);
@@ -94,19 +94,6 @@ public class TestTSOClientRequestAndResponseBehaviours {
pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
commitTable = injector.getInstance(CommitTable.class);
- }
-
- @AfterClass
- public void tearDown() throws Exception {
-
- tsoServer.stopAndWait();
- tsoServer = null;
- TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
-
- }
-
- @BeforeMethod
- public void beforeMethod() {
OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
@@ -115,7 +102,12 @@ public class TestTSOClientRequestAndResponseBehaviours {
}
@AfterMethod
- public void afterMethod() {
+ public void afterMethod() throws Exception {
+
+
+ tsoServer.stopAndWait();
+ tsoServer = null;
+ TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
pausableTSOracle.resume();