You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:43 UTC
[36/50] [abbrv] incubator-omid git commit: Send retried responses
directly form Retry processor
Send retried responses directly form Retry processor
Bypass the reply processor as it just introduces unnecessary complexity in the
code (e.g. NO_ORDER code, etc.)
Change-Id: I9b7f36aba98caef655a8ba74d398691c9b57f043
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/0afa8f16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/0afa8f16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/0afa8f16
Branch: refs/heads/master
Commit: 0afa8f16e1f4cadaf621bac67417e1cb6b8b81f0
Parents: 472c828
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Thu May 5 13:18:36 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Thu May 5 13:18:36 2016 -0700
----------------------------------------------------------------------
.../org/apache/omid/tso/ReplyProcessor.java | 36 +++++++++++--
.../org/apache/omid/tso/ReplyProcessorImpl.java | 56 +++++++-------------
.../org/apache/omid/tso/RetryProcessorImpl.java | 22 ++++----
.../org/apache/omid/tso/TestRetryProcessor.java | 23 ++++----
4 files changed, 68 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 04f40cc..820630b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -32,10 +32,38 @@ interface ReplyProcessor {
*/
void manageResponsesBatch(long batchSequence, Batch batch);
- // TODO This method can be removed if we return the responses from the retry processor
- void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context);
- // TODO This method can be removed if we return the responses from the retry processor
- void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context);
+ /**
+ * Allows to send a commit response back to the client.
+ *
+ * @param startTimestamp
+ * the start timestamp representing the tx identifier that is going to receive the commit response
+ * @param commitTimestamp
+ * the commit timestamp
+ * @param channel
+ * the channel used to send the response back to the client
+ */
+ void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel);
+
+ /**
+ * Allows to send an abort response back to the client.
+ *
+ * @param startTimestamp
+ * the start timestamp representing the tx identifier that is going to receive the abort response
+ * @param channel
+ * the channel used to send the response back to the client
+ */
+ void sendAbortResponse(long startTimestamp, Channel channel);
+
+ /**
+ * Allow to send a timestamp response back to the client.
+ *
+ * @param startTimestamp
+ * the start timestamp to return that will represent the tx identifier for the created transaction
+ * @param channel
+ * the channel used to send the response back to the client
+ */
+
+ void sendTimestampResponse(long startTimestamp, Channel channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index a37f90f..a4e2ce1 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -46,8 +46,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
- private static final int NO_ORDER = (-1);
-
private final ObjectPool<Batch> batchPool;
private final RingBuffer<ReplyBatchEvent> replyRing;
@@ -82,7 +80,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
- return Long.compare(replyBatchEvent1.getBatchID(), replyBatchEvent2.getBatchID());
+ return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
}
});
@@ -96,28 +94,29 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
String name;
Batch batch = event.getBatch();
- for (int i=0; batch != null && i < batch.getNumEvents(); ++i) {
+ for (int i = 0; batch != null && i < batch.getNumEvents(); ++i) {
PersistEvent localEvent = batch.get(i);
switch (localEvent.getType()) {
case COMMIT:
name = "commitReplyProcessor";
localEvent.getMonCtx().timerStart(name);
- handleCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
+ sendCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
localEvent.getMonCtx().timerStop(name);
break;
case ABORT:
name = "abortReplyProcessor";
localEvent.getMonCtx().timerStart(name);
- handleAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
+ sendAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
localEvent.getMonCtx().timerStop(name);
break;
case TIMESTAMP:
name = "timestampReplyProcessor";
localEvent.getMonCtx().timerStart(name);
- handleTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
+ sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
localEvent.getMonCtx().timerStop(name);
break;
+ // TODO Check if we still need this
case LOW_WATERMARK:
break;
default:
@@ -135,7 +134,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private void processWaitingEvents() throws Exception {
- while (!futureEvents.isEmpty() && futureEvents.peek().getBatchID() == nextIDToHandle.get()) {
+ while (!futureEvents.isEmpty() && futureEvents.peek().getBatchSequence() == nextIDToHandle.get()) {
ReplyBatchEvent e = futureEvents.poll();
handleReplyBatchEvent(e);
nextIDToHandle.incrementAndGet();
@@ -150,17 +149,13 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
// while commit smaller than still does not appear in the commit table.
// If previous events were not processed yet (events contain smaller id)
- if (event.getBatchID() > nextIDToHandle.get()) {
+ if (event.getBatchSequence() > nextIDToHandle.get()) {
futureEvents.add(event);
return;
}
handleReplyBatchEvent(event);
- if (event.getBatchID() == NO_ORDER) {
- return;
- }
-
nextIDToHandle.incrementAndGet();
// Process events that arrived before and kept in futureEvents.
@@ -178,25 +173,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- // TODO This method can be removed if we return the responses from the retry processor
@Override
- public void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context) {
-
- batch.addAbort(startTimestamp, true, c, context);
- manageResponsesBatch(NO_ORDER, batch);
-
- }
-
- // TODO This method can be removed if we return the responses from the retry processor
- @Override
- public void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
-
- batch.addCommit(startTimestamp, commitTimestamp, c, context);
- manageResponsesBatch(NO_ORDER, batch);
-
- }
-
- private void handleCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
+ public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -210,7 +188,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- private void handleAbortResponse(long startTimestamp, Channel c) {
+ @Override
+ public void sendAbortResponse(long startTimestamp, Channel c) {
TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -223,7 +202,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- private void handleTimestampResponse(long startTimestamp, Channel c) {
+ @Override
+ public void sendTimestampResponse(long startTimestamp, Channel c) {
TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
@@ -238,19 +218,19 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
final static class ReplyBatchEvent {
private Batch batch;
- private long batchID;
+ private long batchSequence;
- static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchID) {
+ static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchSequence) {
e.batch = batch;
- e.batchID = batchID;
+ e.batchSequence = batchSequence;
}
Batch getBatch() {
return batch;
}
- long getBatchID() {
- return batchID;
+ long getBatchSequence() {
+ return batchSequence;
}
final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 7b67674..5ac12ad 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -24,7 +24,6 @@ import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
@@ -48,7 +47,8 @@ import java.util.concurrent.ThreadFactory;
import static com.codahale.metrics.MetricRegistry.name;
/**
- * Manages the retry requests that clients can send when they did not received the response in the specified timeout.
+ * Manages the disambiguation of the retry requests that clients send when they did not received a response in the
+ * specified timeout. It replies directly to the client with the outcome identified.
*/
class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
@@ -59,7 +59,6 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
final RingBuffer<RetryEvent> retryRing;
final CommitTable.Client commitTableClient;
- final CommitTable.Writer writer; // TODO This is not used. Remove
final ObjectPool<Batch> batchPool;
// Metrics
@@ -74,7 +73,6 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
throws InterruptedException, ExecutionException, IOException {
this.commitTableClient = commitTable.getClient();
- this.writer = commitTable.getWriter();
this.replyProc = replyProc;
this.batchPool = batchPool;
@@ -109,24 +107,22 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
}
- // TODO Reply to the client directly here instead of adding new workload to the reply processor. This avoids a lot
- // of complexity
private void handleCommitRetry(RetryEvent event) throws Exception {
+
long startTimestamp = event.getStartTimestamp();
try {
Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
- Batch batch = batchPool.borrowObject();
if(commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
- LOG.trace("Valid commit TS found in Commit Table");
- replyProc.addCommit(batch, startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
+ LOG.trace("Valid commit TS found in Commit Table. Replying Commit to the client...");
+ replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
} else {
- LOG.trace("Invalid commit TS found in Commit Table");
- replyProc.addAbort(batch, startTimestamp, event.getChannel(), event.getMonCtx());
+ LOG.trace("Invalid commit TS found in Commit Table. Replying Abort to the client...");
+ replyProc.sendAbortResponse(startTimestamp, event.getChannel());
}
} else {
- LOG.trace("No commit TS found in Commit Table");
- replyProc.addAbort(batch, startTimestamp, event.getChannel(), event.getMonCtx());
+ LOG.trace("No commit TS found in Commit Table. Replying Abort to the client..");
+ replyProc.sendAbortResponse(startTimestamp, event.getChannel());
}
} catch (InterruptedException e) {
LOG.error("Interrupted reading from commit table");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 0a4fd44..eecab7a 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -43,8 +43,6 @@ public class TestRetryProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
- private MetricsRegistry metrics = new NullMetricsProvider();
-
private static long NON_EXISTING_ST_TX = 1000;
private static long ST_TX_1 = 0;
private static long CT_TX_1 = 1;
@@ -55,6 +53,8 @@ public class TestRetryProcessor {
private ReplyProcessor replyProc;
@Mock
private Panicker panicker;
+ @Mock
+ private MetricsRegistry metrics;
private CommitTable commitTable;
@@ -63,7 +63,6 @@ public class TestRetryProcessor {
MockitoAnnotations.initMocks(this);
// Init components
commitTable = new InMemoryCommitTable();
- metrics = new NullMetricsProvider();
}
@Test(timeOut = 10_000)
@@ -77,8 +76,7 @@ public class TestRetryProcessor {
retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1))
- .addAbort(any(Batch.class), firstTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
long startTS = firstTSCapture.getValue();
assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
}
@@ -96,16 +94,15 @@ public class TestRetryProcessor {
ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1)).addCommit(any(Batch.class),
- firstTSCapture.capture(),
- secondTSCapture.capture(),
- any(Channel.class),
- any(MonitoringContext.class));
+ verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
+ secondTSCapture.capture(),
+ any(Channel.class));
long startTS = firstTSCapture.getValue();
long commitTS = secondTSCapture.getValue();
assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
+
}
@Test(timeOut = 10_000)
@@ -125,12 +122,10 @@ public class TestRetryProcessor {
// The element to test
RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
- // Test we'll reply with an abort for a retry request when the
- // transaction id IS in the commit table BUT invalidated
+ // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
- verify(replyProc, timeout(100).times(1))
- .addAbort(any(Batch.class), startTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
long startTS = startTSCapture.getValue();
Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");