You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2018/12/03 13:46:31 UTC
[36/50] [abbrv] incubator-omid git commit: OMID-90 Add omid low
latency mode
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
deleted file mode 100644
index e5fbee8..0000000
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutHandler;
-import com.lmax.disruptor.dsl.Disruptor;
-
-import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import static com.lmax.disruptor.dsl.ProducerType.MULTI;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
-
-class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
-
- // Disruptor-related attributes
- private final ExecutorService disruptorExec;
- private final Disruptor<RequestEvent> disruptor;
- private final RingBuffer<RequestEvent> requestRing;
-
- private final TimestampOracle timestampOracle;
- private final CommitHashMap hashmap;
- private final Map<Long, Long> tableFences;
- private final MetricsRegistry metrics;
- private final PersistenceProcessor persistProc;
-
- private long lowWatermark = -1L;
-
- @Inject
- RequestProcessorImpl(MetricsRegistry metrics,
- TimestampOracle timestampOracle,
- PersistenceProcessor persistProc,
- Panicker panicker,
- TSOServerConfig config)
- throws IOException {
-
- // ------------------------------------------------------------------------------------------------------------
- // Disruptor initialization
- // ------------------------------------------------------------------------------------------------------------
-
- TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
-
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
- this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
-
- this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
- disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
- disruptor.handleEventsWith(this);
- this.requestRing = disruptor.start();
-
- // ------------------------------------------------------------------------------------------------------------
- // Attribute initialization
- // ------------------------------------------------------------------------------------------------------------
-
- this.metrics = metrics;
- this.persistProc = persistProc;
- this.timestampOracle = timestampOracle;
- this.hashmap = new CommitHashMap(config.getConflictMapSize());
- this.tableFences = new HashMap<Long, Long>();
-
- LOG.info("RequestProcessor initialized");
-
- }
-
- /**
- * This should be called when the TSO gets leadership
- */
- @Override
- public void update(TSOState state) throws Exception {
- LOG.info("Initializing RequestProcessor state...");
- this.lowWatermark = state.getLowWatermark();
- persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
- LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
- }
-
- @Override
- public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
-
- switch (event.getType()) {
- case TIMESTAMP:
- handleTimestamp(event);
- break;
- case COMMIT:
- handleCommit(event);
- break;
- case FENCE:
- handleFence(event);
- break;
- default:
- throw new IllegalStateException("Event not allowed in Request Processor: " + event);
- }
-
- }
-
- @Override
- public void onTimeout(long sequence) throws Exception {
-
- // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
- // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
- // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
- // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
- // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
- // TODO (cont) in persistProc and it is guaranteed that access them serially.
- persistProc.triggerCurrentBatchFlush();
-
- }
-
- @Override
- public void timestampRequest(Channel c, MonitoringContext monCtx) {
-
- monCtx.timerStart("request.processor.timestamp.latency");
- long seq = requestRing.next();
- RequestEvent e = requestRing.get(seq);
- RequestEvent.makeTimestampRequest(e, c, monCtx);
- requestRing.publish(seq);
-
- }
-
- @Override
- public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
- MonitoringContext monCtx) {
-
- monCtx.timerStart("request.processor.commit.latency");
- long seq = requestRing.next();
- RequestEvent e = requestRing.get(seq);
- RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
- requestRing.publish(seq);
-
- }
-
- @Override
- public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
-
- monCtx.timerStart("request.processor.fence.latency");
- long seq = requestRing.next();
- RequestEvent e = requestRing.get(seq);
- RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
- requestRing.publish(seq);
-
- }
-
- private void handleTimestamp(RequestEvent requestEvent) throws Exception {
-
- long timestamp = timestampOracle.next();
- requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
- persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
-
- }
-
- // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
- private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
- if (!tableFences.isEmpty()) {
- for (long tableId: tableIdSet) {
- Long fence = tableFences.get(tableId);
- if (fence != null && fence > startTimestamp) {
- return true;
- }
- if (fence != null && fence < lowWatermark) {
- tableFences.remove(tableId); // Garbage collect entries of old fences.
- }
- }
- }
-
- return false;
- }
-
- // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
- private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
- for (long cellId : writeSet) {
- long value = hashmap.getLatestWriteForCell(cellId);
- if (value != 0 && value >= startTimestamp) {
- return true;
- }
- }
-
- return false;
- }
-
- private void handleCommit(RequestEvent event) throws Exception {
-
- long startTimestamp = event.getStartTimestamp();
- Iterable<Long> writeSet = event.writeSet();
- Collection<Long> tableIdSet = event.getTableIdSet();
- boolean isCommitRetry = event.isCommitRetry();
- Channel c = event.getChannel();
-
- boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
-
- // If the transaction started before the low watermark, or
- // it started before a fence and modified the table the fence created for, or
- // it has a write-write conflict with a transaction committed after it started
- // Then it should abort. Otherwise, it can commit.
- if (startTimestamp > lowWatermark &&
- !hasConflictsWithFences(startTimestamp, tableIdSet) &&
- !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
-
- long commitTimestamp = timestampOracle.next();
-
- if (nonEmptyWriteSet) {
- long newLowWatermark = lowWatermark;
-
- for (long r : writeSet) {
- long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
- newLowWatermark = Math.max(removed, newLowWatermark);
- }
-
- if (newLowWatermark != lowWatermark) {
- LOG.trace("Setting new low Watermark to {}", newLowWatermark);
- lowWatermark = newLowWatermark;
- persistProc.persistLowWatermark(newLowWatermark); // Async persist
- }
- }
- event.getMonCtx().timerStop("request.processor.commit.latency");
- persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
-
- } else {
-
- event.getMonCtx().timerStop("request.processor.commit.latency");
- if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
- persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
- } else {
- persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
- }
-
- }
-
- }
-
- private void handleFence(RequestEvent event) throws Exception {
- long tableID = event.getTableId();
- Channel c = event.getChannel();
-
- long fenceTimestamp = timestampOracle.next();
-
- tableFences.put(tableID, fenceTimestamp);
- persistProc.addFenceToBatch(tableID, fenceTimestamp, c, event.getMonCtx());
- }
-
- @Override
- public void close() throws IOException {
-
- LOG.info("Terminating Request Processor...");
- disruptor.halt();
- disruptor.shutdown();
- LOG.info("\tRequest Processor Disruptor shutdown");
- disruptorExec.shutdownNow();
- try {
- disruptorExec.awaitTermination(3, SECONDS);
- LOG.info("\tRequest Processor Disruptor executor shutdown");
- } catch (InterruptedException e) {
- LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
- Thread.currentThread().interrupt();
- }
- LOG.info("Request Processor terminated");
-
- }
-
- final static class RequestEvent implements Iterable<Long> {
-
- enum Type {
- TIMESTAMP, COMMIT, FENCE
- }
-
- private Type type = null;
- private Channel channel = null;
-
- private boolean isCommitRetry = false;
- private long startTimestamp = 0;
- private MonitoringContext monCtx;
- private long numCells = 0;
-
- private static final int MAX_INLINE = 40;
- private Long writeSet[] = new Long[MAX_INLINE];
- private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
-
- private Collection<Long> tableIdSet = null;
- private long tableID = 0;
-
- static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
- e.type = Type.TIMESTAMP;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- static void makeCommitRequest(RequestEvent e,
- long startTimestamp,
- MonitoringContext monCtx,
- Collection<Long> writeSet,
- Collection<Long> TableIdSet,
- boolean isRetry,
- Channel c) {
- e.monCtx = monCtx;
- e.type = Type.COMMIT;
- e.channel = c;
- e.startTimestamp = startTimestamp;
- e.isCommitRetry = isRetry;
- if (writeSet.size() > MAX_INLINE) {
- e.numCells = writeSet.size();
- e.writeSetAsCollection = writeSet;
- } else {
- e.writeSetAsCollection = null;
- e.numCells = writeSet.size();
- int i = 0;
- for (Long cellId : writeSet) {
- e.writeSet[i] = cellId;
- ++i;
- }
- }
- e.tableIdSet = TableIdSet;
- }
-
- static void makeFenceRequest(RequestEvent e,
- long tableID,
- Channel c,
- MonitoringContext monCtx) {
- e.type = Type.FENCE;
- e.channel = c;
- e.monCtx = monCtx;
- e.tableID = tableID;
- }
-
- MonitoringContext getMonCtx() {
- return monCtx;
- }
-
- Type getType() {
- return type;
- }
-
- long getStartTimestamp() {
- return startTimestamp;
- }
-
- Channel getChannel() {
- return channel;
- }
-
- Collection<Long> getTableIdSet() {
- return tableIdSet;
- }
-
- long getTableId() {
- return tableID;
- }
-
- @Override
- public Iterator<Long> iterator() {
-
- if (writeSetAsCollection != null) {
- return writeSetAsCollection.iterator();
- }
-
- return new Iterator<Long>() {
- int i = 0;
-
- @Override
- public boolean hasNext() {
- return i < numCells;
- }
-
- @Override
- public Long next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return writeSet[i++];
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
-
- }
-
- Iterable<Long> writeSet() {
-
- return this;
-
- }
-
- boolean isCommitRetry() {
- return isCommitRetry;
- }
-
- final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
- @Override
- public RequestEvent newInstance() {
- return new RequestEvent();
- }
- };
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
new file mode 100644
index 0000000..0a58b0e
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorPersistCT extends AbstractRequestProcessor {
+
+ private final PersistenceProcessor persistenceProcessor;
+
+ @Inject
+ RequestProcessorPersistCT(MetricsRegistry metrics,
+ TimestampOracle timestampOracle,
+ PersistenceProcessor persistenceProcessor,
+ Panicker panicker,
+ TSOServerConfig config,
+ LowWatermarkWriter lowWatermarkWriter,
+ ReplyProcessor replyProcessor) throws IOException {
+
+ super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
+ this.persistenceProcessor = persistenceProcessor;
+ requestRing = disruptor.start();
+ }
+
+ @Override
+ public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx);
+ }
+
+ @Override
+ public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ persistenceProcessor.addCommitRetryToBatch(startTimestamp,c,monCtx);
+ }
+
+ @Override
+ public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ persistenceProcessor.addAbortToBatch(startTimestamp,c,monCtx);
+ }
+
+ @Override
+ public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ persistenceProcessor.addTimestampToBatch(startTimestamp,c,monCtx);
+ }
+
+ @Override
+ public void onTimeout() throws Exception {
+ persistenceProcessor.triggerCurrentBatchFlush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
new file mode 100644
index 0000000..41798f5
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorSkipCT extends AbstractRequestProcessor {
+
+
+ private final ReplyProcessor replyProcessor;
+
+ private final LeaseManagement leaseManager;
+ private final Panicker panicker;
+ private final String tsoHostAndPort;
+
+ @Inject
+ RequestProcessorSkipCT(MetricsRegistry metrics,
+ TimestampOracle timestampOracle,
+ ReplyProcessor replyProcessor,
+ Panicker panicker,
+ LeaseManagement leaseManager,
+ TSOServerConfig config,
+ LowWatermarkWriter lowWatermarkWriter,
+ String tsoHostAndPort) throws IOException {
+ super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
+ this.replyProcessor = replyProcessor;
+ this.tsoHostAndPort = tsoHostAndPort;
+ requestRing = disruptor.start();
+ this.leaseManager = leaseManager;
+ this.panicker = panicker;
+ }
+
+ private void commitSuicideIfNotMaster() {
+ if (!leaseManager.stillInLeasePeriod()) {
+ panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
+ }
+ }
+
+ @Override
+ public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+ commitSuicideIfNotMaster();
+ monCtx.timerStart("reply.processor.commit.latency");
+ replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx);
+ }
+
+ @Override
+ public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+ monCtx.timerStart("reply.processor.abort.latency");
+ replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+ }
+
+ @Override
+ public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
+ monCtx.timerStart("reply.processor.abort.latency");
+ replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+ }
+
+ @Override
+ public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
+ monCtx.timerStart("reply.processor.timestamp.latency");
+ replyProcessor.sendTimestampResponse(startTimestamp, c, monCtx);
+ }
+
+ @Override
+ public void onTimeout() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 6d923be..610e760 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -133,16 +133,16 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
if (commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
- replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
+ replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
txAlreadyCommittedMeter.mark();
} else {
LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
- replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+ replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
invalidTxMeter.mark();
}
} else {
LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp);
- replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+ replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
noCTFoundMeter.mark();
}
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index a218a1d..f936e88 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -165,7 +165,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
}
if (request.hasTimestampRequest()) {
- requestProcessor.timestampRequest(ctx.getChannel(), new MonitoringContext(metrics));
+ requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasCommitRequest()) {
TSOProto.CommitRequest cr = request.getCommitRequest();
requestProcessor.commitRequest(cr.getStartTimestamp(),
@@ -173,10 +173,12 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
cr.getTableIdList(),
cr.getIsRetry(),
ctx.getChannel(),
- new MonitoringContext(metrics));
+ MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasFenceRequest()) {
TSOProto.FenceRequest fr = request.getFenceRequest();
- requestProcessor.fenceRequest(fr.getTableId(), ctx.getChannel(), new MonitoringContext(metrics));
+ requestProcessor.fenceRequest(fr.getTableId(),
+ ctx.getChannel(),
+ MonitoringContextFactory.getInstance(config,metrics));
} else {
LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
ctx.getChannel().close();
@@ -193,7 +195,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
return;
}
- LOG.warn("Unexpected exception from downstream. Closing channel {}", ctx.getChannel(), e.getCause());
+ LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
ctx.getChannel().close();
}
@@ -244,6 +246,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
} else {
response.setClientCompatible(false);
}
+ response.setLowLatency(config.getLowLatency());
ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index 4d0d844..5c96aa3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -52,7 +52,7 @@ class TSOModule extends AbstractModule {
} else {
bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
}
-
+ bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class);
install(new BatchPoolModule(config));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
index f30e64d..d97b824 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
@@ -51,8 +51,9 @@ public class TSOServer extends AbstractIdleService {
@Inject
private RetryProcessor retryProcessor;
@Inject
- private ReplyProcessor replyProcessor;
-
+ public ReplyProcessor replyProcessor;
+ @Inject
+ private LowWatermarkWriter lowWatermarkWriter;
// ----------------------------------------------------------------------------------------------------------------
// High availability related variables
// ----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 8f061a1..e28add3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -89,6 +89,26 @@ public class TSOServerConfig extends SecureHBaseConfig {
private String timestampType;
+ private Boolean lowLatency;
+
+ public boolean monitorContext;
+
+ public boolean getMonitorContext() {
+ return monitorContext;
+ }
+
+ public void setMonitorContext(boolean monitorContext) {
+ this.monitorContext = monitorContext;
+ }
+
+ public Boolean getLowLatency() {
+ return lowLatency;
+ }
+
+ public void setLowLatency(Boolean lowLatency) {
+ this.lowLatency = lowLatency;
+ }
+
public int getPort() {
return port;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index 454526f..fec82af 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -83,8 +83,8 @@ public class TimestampOracleImpl implements TimestampOracle {
}
- static final long TIMESTAMP_BATCH = 10_000_000; // 10 million
- private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000; // 1 million
+ static final long TIMESTAMP_BATCH = 10_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 10 million
+ private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 1 million
private long lastTimestamp;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index 74895a4..3129904 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -44,6 +44,7 @@ batchPersistTimeoutInMs: 10
# INCREMENTAL - [Default] regular counter
# WORLD_TIME - world time based counter
timestampType: INCREMENTAL
+lowLatency: false
# Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]
@@ -52,6 +53,8 @@ leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
# Default stats/metrics configuration
metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
+monitorContext: false
+
# ---------------------------------------------------------------------------------------------------------------------
# Timestamp storage configuration options
# ---------------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index fc30e60..a346e5e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -58,6 +58,7 @@ public class TSOMockModule extends AbstractModule {
bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
}
bind(Panicker.class).to(MockPanicker.class).in(Singleton.class);
+ bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
install(new BatchPoolModule(config));
install(config.getLeaseModule());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 573cd89..c286f85 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -43,7 +43,7 @@ public class TestBatch {
@Mock
private Channel channel;
@Mock
- private MonitoringContext monCtx;
+ private MonitoringContextImpl monCtx;
@BeforeMethod
void setup() {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index ae89f01..779111d 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -137,9 +137,12 @@ public class TestPanicker {
handlers,
metrics);
- proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+ proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
- new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+ LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
+
+ new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker,
+ mock(TSOServerConfig.class), lowWatermarkWriter, mock(ReplyProcessor.class));
verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
@@ -189,9 +192,12 @@ public class TestPanicker {
panicker,
handlers,
metrics);
- proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+ proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+
+ LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
- new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+ new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class),
+ lowWatermarkWriter, mock(ReplyProcessor.class));
verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 4779608..5d9e2c2 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -68,6 +68,7 @@ public class TestPersistenceProcessor {
private MetricsRegistry metrics;
private CommitTable commitTable;
+ private LowWatermarkWriter lowWatermarkWriter;
@BeforeMethod(alwaysRun = true, timeOut = 30_000)
public void initMocksAndComponents() throws Exception {
@@ -101,6 +102,7 @@ public class TestPersistenceProcessor {
public void testLowWatermarkIsPersisted() throws Exception {
TSOServerConfig tsoConfig = new TSOServerConfig();
+ lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -123,7 +125,7 @@ public class TestPersistenceProcessor {
handlers,
metrics);
- persistenceProcessor.persistLowWatermark(ANY_LWM).get();
+ lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
CommitTable.Writer lwmWriter = commitTable.getWriter();
@@ -166,10 +168,10 @@ public class TestPersistenceProcessor {
verify(batchPool, times(1)).borrowObject(); // Called during initialization
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
@@ -211,8 +213,8 @@ public class TestPersistenceProcessor {
verify(batchPool, times(1)).borrowObject(); // Called during initialization
// Fill 1st handler Batches completely
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
verify(batchPool, times(2)).borrowObject();
proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
@@ -223,14 +225,14 @@ public class TestPersistenceProcessor {
verify(batchPool, times(3)).borrowObject();
// Fill 2nd handler Batches completely
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
// Start filling a new currentBatch and flush it immediately
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
verify(batchPool, times(5)).borrowObject();
proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
verify(batchPool, times(6)).borrowObject();
@@ -281,7 +283,7 @@ public class TestPersistenceProcessor {
// The non-ha lease manager always return true for
// stillInLeasePeriod(), so verify the currentBatch sends replies as master
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
proc.triggerCurrentBatchFlush();
verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -336,7 +338,7 @@ public class TestPersistenceProcessor {
// Test: Configure the lease manager to return true always
doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -357,7 +359,7 @@ public class TestPersistenceProcessor {
// Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -378,7 +380,7 @@ public class TestPersistenceProcessor {
// Test: Configure the lease manager to return false for stillInLeasePeriod
doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -402,7 +404,7 @@ public class TestPersistenceProcessor {
// Configure mock writer to flush unsuccessfully
doThrow(new IOException("Unable to write")).when(mockWriter).flush();
doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -452,7 +454,7 @@ public class TestPersistenceProcessor {
PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
- MonitoringContext monCtx = new MonitoringContext(metrics);
+ MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
// Configure lease manager to work normally
doReturn(true).when(leaseManager).stillInLeasePeriod();
@@ -492,7 +494,7 @@ public class TestPersistenceProcessor {
// Configure writer to explode with a runtime exception
doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
- MonitoringContext monCtx = new MonitoringContext(metrics);
+ MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
// Check the panic is extended!
proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 43f354f..4f190f9 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -167,7 +167,7 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(eq(0));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertTrue(batch.isEmpty());
@@ -178,14 +178,14 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
persistenceHandler.onEvent(batchEvent);
verify(persistenceHandler, times(1)).flush(eq(0));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 1);
assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -197,14 +197,14 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
persistenceHandler.onEvent(batchEvent);
verify(persistenceHandler, times(1)).flush(eq(1));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
- verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 1);
assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -217,14 +217,14 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
persistenceHandler.onEvent(batchEvent);
verify(persistenceHandler, times(1)).flush(eq(0));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
- verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 1);
assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -236,7 +236,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -245,7 +245,7 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(eq(0));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
- verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 0);
@@ -256,8 +256,8 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
- batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -269,7 +269,7 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(eq(1));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 1);
assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -285,8 +285,8 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
- batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -298,7 +298,7 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(eq(1));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 1);
assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
@@ -311,8 +311,8 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
- batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -324,8 +324,8 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(eq(0));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
- verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 0);
@@ -336,8 +336,8 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
+ batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
+ batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -349,7 +349,7 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(eq(0));
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 2);
assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -364,12 +364,12 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
- batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
- batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
- batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
- batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
- batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
+ batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class));
+ batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -381,7 +381,7 @@ public class TestPersistenceProcessorHandler {
verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
- verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
assertEquals(batch.getNumEvents(), 4);
assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -408,7 +408,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -450,7 +450,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -485,7 +485,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 3ead24b..54d1e70 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -70,7 +70,7 @@ public class TestReplyProcessor {
private Panicker panicker;
@Mock
- private MonitoringContext monCtx;
+ private MonitoringContextImpl monCtx;
private MetricsRegistry metrics;
@@ -247,11 +247,11 @@ public class TestReplyProcessor {
inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
- inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class));
- inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class));
- inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class));
- inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class));
- inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class));
+ inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
+ inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
+ inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
+ inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
+ inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 1c44d05..645caa1 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -59,26 +59,32 @@ public class TestRequestProcessor {
// Request processor under test
private RequestProcessor requestProc;
+ private LowWatermarkWriter lowWatermarkWriter;
+ private TimestampOracleImpl timestampOracle;
+ private ReplyProcessor replyProcessor;
+
@BeforeMethod
public void beforeMethod() throws Exception {
// Build the required scaffolding for the test
MetricsRegistry metrics = new NullMetricsProvider();
- TSOServerConfig config = new TSOServerConfig();
- config.setConflictMapSize(CONFLICT_MAP_SIZE);
-
TimestampOracleImpl timestampOracle =
new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
stateManager = new TSOStateManagerImpl(timestampOracle);
-
+ lowWatermarkWriter = mock(LowWatermarkWriter.class);
persist = mock(PersistenceProcessor.class);
+ replyProcessor = mock(ReplyProcessor.class);
SettableFuture<Void> f = SettableFuture.create();
f.set(null);
- doReturn(f).when(persist).persistLowWatermark(any(Long.class));
+ doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
- requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
+ TSOServerConfig config = new TSOServerConfig();
+ config.setConflictMapSize(CONFLICT_MAP_SIZE);
+
+ requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(),
+ config, lowWatermarkWriter,replyProcessor);
// Initialize the state for the experiment
stateManager.register(requestProc);
@@ -89,15 +95,15 @@ public class TestRequestProcessor {
@Test(timeOut = 30_000)
public void testTimestamp() throws Exception {
- requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(1)).addTimestampToBatch(
- firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
long firstTS = firstTScapture.getValue();
// verify that timestamps increase monotonically
for (int i = 0; i < 100; i++) {
- requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
firstTS += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
}
@@ -107,48 +113,48 @@ public class TestRequestProcessor {
@Test(timeOut = 30_000)
public void testCommit() throws Exception {
- requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(1)).addTimestampToBatch(
- TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
long firstTS = TScapture.getValue();
List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
- requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
- requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
// test conflict
- requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
TScapture = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(2)).addTimestampToBatch(
- TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
long secondTS = TScapture.getValue();
- requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
TScapture = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(3)).addTimestampToBatch(
- TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
long thirdTS = TScapture.getValue();
- requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
- requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
+ requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+ verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
+ requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
}
@Test(timeOut = 30_000)
public void testFence() throws Exception {
- requestProc.fenceRequest(666L, null, new MonitoringContext(metrics));
+ requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).addFenceToBatch(eq(666L),
+ verify(replyProcessor, timeout(100).times(1)).sendFenceResponse(eq(666L),
firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
}
@@ -159,11 +165,11 @@ public class TestRequestProcessor {
List<Long> writeSet = Collections.emptyList();
// Start a transaction...
- requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
any(Channel.class),
- any(MonitoringContext.class));
+ any(MonitoringContextImpl.class));
long startTS = capturedTS.getValue();
// ... simulate the reset of the RequestProcessor state (e.g. due to
@@ -171,8 +177,8 @@ public class TestRequestProcessor {
stateManager.initialize();
// ...check that the transaction is aborted when trying to commit
- requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
+ requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
}
@@ -187,17 +193,17 @@ public class TestRequestProcessor {
for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
- requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
}
Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
// Check that first time its called is on init
- verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
+ verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
// Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
- verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
+ verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
// Finally it should never be called with the next element
- verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
+ verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index ab17ecc..5476f90 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -56,6 +56,8 @@ public class TestRetryProcessor {
private Panicker panicker;
@Mock
private MetricsRegistry metrics;
+ @Mock
+ private MonitoringContextImpl monCtx;
private CommitTable commitTable;
@@ -74,10 +76,10 @@ public class TestRetryProcessor {
RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
// Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
- retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
+ retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx);
ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
+ verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
long startTS = firstTSCapture.getValue();
assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
}
@@ -91,13 +93,13 @@ public class TestRetryProcessor {
// Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
- retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+ retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
secondTSCapture.capture(),
- any(Channel.class));
+ any(Channel.class), any(MonitoringContextImpl.class));
long startTS = firstTSCapture.getValue();
long commitTS = secondTSCapture.getValue();
@@ -124,9 +126,9 @@ public class TestRetryProcessor {
RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
// Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
- retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+ retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
+ verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
long startTS = startTSCapture.getValue();
Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 157bb48..245f3b6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -260,9 +260,9 @@ public class TestTSOChannelHandlerNetty {
tsBuilder.setTimestampRequest(tsRequestBuilder.build());
// Write into the channel
channel.write(tsBuilder.build()).await();
- verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+ verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
verify(requestProcessor, timeout(100).never())
- .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
}
private void testWritingCommitRequest(Channel channel) throws InterruptedException {
@@ -277,9 +277,9 @@ public class TestTSOChannelHandlerNetty {
assertTrue(r.hasCommitRequest());
// Write into the channel
channel.write(commitBuilder.build()).await();
- verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+ verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
verify(requestProcessor, timeout(100).times(1))
- .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
+ .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
}
private void testWritingFenceRequest(Channel channel) throws InterruptedException {
@@ -293,9 +293,9 @@ public class TestTSOChannelHandlerNetty {
assertTrue(r.hasFenceRequest());
// Write into the channel
channel.write(fenceBuilder.build()).await();
- verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+ verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
verify(requestProcessor, timeout(100).times(1))
- .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContext.class));
+ .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContextImpl.class));
}
// ----------------------------------------------------------------------------------------------------------------