You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:43 UTC

[36/50] [abbrv] incubator-omid git commit: Send retried responses directly form Retry processor

Send retried responses directly form Retry processor

Bypass the reply processor as it just introduces unnecessary complexity in the
code (e.g. NO_ORDER code, etc.)

Change-Id: I9b7f36aba98caef655a8ba74d398691c9b57f043


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/0afa8f16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/0afa8f16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/0afa8f16

Branch: refs/heads/master
Commit: 0afa8f16e1f4cadaf621bac67417e1cb6b8b81f0
Parents: 472c828
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Thu May 5 13:18:36 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Thu May 5 13:18:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/omid/tso/ReplyProcessor.java     | 36 +++++++++++--
 .../org/apache/omid/tso/ReplyProcessorImpl.java | 56 +++++++-------------
 .../org/apache/omid/tso/RetryProcessorImpl.java | 22 ++++----
 .../org/apache/omid/tso/TestRetryProcessor.java | 23 ++++----
 4 files changed, 68 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 04f40cc..820630b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -32,10 +32,38 @@ interface ReplyProcessor {
      */
     void manageResponsesBatch(long batchSequence, Batch batch);
 
-    // TODO This method can be removed if we return the responses from the retry processor
-    void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context);
-    // TODO This method can be removed if we return the responses from the retry processor
-    void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context);
+    /**
+     * Allows to send a commit response back to the client.
+     *
+     * @param startTimestamp
+     *            the start timestamp representing the tx identifier that is going to receive the commit response
+     * @param commitTimestamp
+     *            the commit timestamp
+     * @param channel
+     *            the channel used to send the response back to the client
+     */
+    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel);
+
+    /**
+     * Allows to send an abort response back to the client.
+     *
+     * @param startTimestamp
+     *            the start timestamp representing the tx identifier that is going to receive the abort response
+     * @param channel
+     *            the channel used to send the response back to the client
+     */
+    void sendAbortResponse(long startTimestamp, Channel channel);
+
+    /**
+     * Allow to send a timestamp response back to the client.
+     *
+     * @param startTimestamp
+     *            the start timestamp to return that will represent the tx identifier for the created transaction
+     * @param channel
+     *            the channel used to send the response back to the client
+     */
+
+    void sendTimestampResponse(long startTimestamp, Channel channel);
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index a37f90f..a4e2ce1 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -46,8 +46,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
 
-    private static final int NO_ORDER = (-1);
-
     private final ObjectPool<Batch> batchPool;
 
     private final RingBuffer<ReplyBatchEvent> replyRing;
@@ -82,7 +80,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
         this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
             public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
-                return Long.compare(replyBatchEvent1.getBatchID(), replyBatchEvent2.getBatchID());
+                return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
             }
         });
 
@@ -96,28 +94,29 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
         String name;
         Batch batch = event.getBatch();
-        for (int i=0; batch != null && i < batch.getNumEvents(); ++i) {
+        for (int i = 0; batch != null && i < batch.getNumEvents(); ++i) {
             PersistEvent localEvent = batch.get(i);
 
             switch (localEvent.getType()) {
             case COMMIT:
                 name = "commitReplyProcessor";
                 localEvent.getMonCtx().timerStart(name);
-                handleCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
+                sendCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
                 localEvent.getMonCtx().timerStop(name);
                  break;
             case ABORT:
                 name = "abortReplyProcessor";
                 localEvent.getMonCtx().timerStart(name);
-                handleAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
+                sendAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
                 localEvent.getMonCtx().timerStop(name);
                 break;
             case TIMESTAMP:
                 name = "timestampReplyProcessor";
                 localEvent.getMonCtx().timerStart(name);
-                handleTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
+                sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
                 localEvent.getMonCtx().timerStop(name);
                 break;
+            // TODO Check if we still need this
             case LOW_WATERMARK:
                 break;
             default:
@@ -135,7 +134,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     private void processWaitingEvents() throws Exception {
 
-        while (!futureEvents.isEmpty() && futureEvents.peek().getBatchID() == nextIDToHandle.get()) {
+        while (!futureEvents.isEmpty() && futureEvents.peek().getBatchSequence() == nextIDToHandle.get()) {
             ReplyBatchEvent e = futureEvents.poll();
             handleReplyBatchEvent(e);
             nextIDToHandle.incrementAndGet();
@@ -150,17 +149,13 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         // while commit smaller than still does not appear in the commit table.
 
         // If previous events were not processed yet (events contain smaller id)
-        if (event.getBatchID() > nextIDToHandle.get()) {
+        if (event.getBatchSequence() > nextIDToHandle.get()) {
             futureEvents.add(event);
             return;
         }
 
         handleReplyBatchEvent(event);
 
-        if (event.getBatchID() == NO_ORDER) {
-            return;
-        }
-
         nextIDToHandle.incrementAndGet();
 
         // Process events that arrived before and kept in futureEvents.
@@ -178,25 +173,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    // TODO This method can be removed if we return the responses from the retry processor
     @Override
-    public void addAbort(Batch batch, long startTimestamp, Channel c, MonitoringContext context) {
-
-        batch.addAbort(startTimestamp, true, c, context);
-        manageResponsesBatch(NO_ORDER, batch);
-
-    }
-
-    // TODO This method can be removed if we return the responses from the retry processor
-    @Override
-    public void addCommit(Batch batch, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
-
-        batch.addCommit(startTimestamp, commitTimestamp, c, context);
-        manageResponsesBatch(NO_ORDER, batch);
-
-    }
-
-    private void handleCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
+    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -210,7 +188,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    private void handleAbortResponse(long startTimestamp, Channel c) {
+    @Override
+    public void sendAbortResponse(long startTimestamp, Channel c) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -223,7 +202,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    private void handleTimestampResponse(long startTimestamp, Channel c) {
+    @Override
+    public void sendTimestampResponse(long startTimestamp, Channel c) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
@@ -238,19 +218,19 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
     final static class ReplyBatchEvent {
 
         private Batch batch;
-        private long batchID;
+        private long batchSequence;
 
-        static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchID) {
+        static void makeReplyBatch(ReplyBatchEvent e, Batch batch, long batchSequence) {
             e.batch = batch;
-            e.batchID = batchID;
+            e.batchSequence = batchSequence;
         }
 
         Batch getBatch() {
             return batch;
         }
 
-        long getBatchID() {
-            return batchID;
+        long getBatchSequence() {
+            return batchSequence;
         }
 
         final static EventFactory<ReplyBatchEvent> EVENT_FACTORY = new EventFactory<ReplyBatchEvent>() {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 7b67674..5ac12ad 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -24,7 +24,6 @@ import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.YieldingWaitStrategy;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
@@ -48,7 +47,8 @@ import java.util.concurrent.ThreadFactory;
 import static com.codahale.metrics.MetricRegistry.name;
 
 /**
- * Manages the retry requests that clients can send when they did not received the response in the specified timeout.
+ * Manages the disambiguation of the retry requests that clients send when they did not received a response in the
+ * specified timeout. It replies directly to the client with the outcome identified.
  */
 class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
 
@@ -59,7 +59,6 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
     final RingBuffer<RetryEvent> retryRing;
 
     final CommitTable.Client commitTableClient;
-    final CommitTable.Writer writer; // TODO This is not used. Remove
     final ObjectPool<Batch> batchPool;
 
     // Metrics
@@ -74,7 +73,6 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
             throws InterruptedException, ExecutionException, IOException {
 
         this.commitTableClient = commitTable.getClient();
-        this.writer = commitTable.getWriter();
         this.replyProc = replyProc;
         this.batchPool = batchPool;
 
@@ -109,24 +107,22 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
 
     }
 
-    // TODO Reply to the client directly here instead of adding new workload to the reply processor. This avoids a lot
-    // of complexity
     private void handleCommitRetry(RetryEvent event) throws Exception {
+
         long startTimestamp = event.getStartTimestamp();
         try {
             Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
-            Batch batch = batchPool.borrowObject();
             if(commitTimestamp.isPresent()) {
                 if (commitTimestamp.get().isValid()) {
-                    LOG.trace("Valid commit TS found in Commit Table");
-                    replyProc.addCommit(batch, startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
+                    LOG.trace("Valid commit TS found in Commit Table. Replying Commit to the client...");
+                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
                 } else {
-                    LOG.trace("Invalid commit TS found in Commit Table");
-                    replyProc.addAbort(batch, startTimestamp, event.getChannel(), event.getMonCtx());
+                    LOG.trace("Invalid commit TS found in Commit Table. Replying Abort to the client...");
+                    replyProc.sendAbortResponse(startTimestamp, event.getChannel());
                 }
             } else {
-                LOG.trace("No commit TS found in Commit Table");
-                replyProc.addAbort(batch, startTimestamp, event.getChannel(), event.getMonCtx());
+                LOG.trace("No commit TS found in Commit Table. Replying Abort to the client..");
+                replyProc.sendAbortResponse(startTimestamp, event.getChannel());
             }
         } catch (InterruptedException e) {
             LOG.error("Interrupted reading from commit table");

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0afa8f16/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 0a4fd44..eecab7a 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -43,8 +43,6 @@ public class TestRetryProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
 
-    private MetricsRegistry metrics = new NullMetricsProvider();
-
     private static long NON_EXISTING_ST_TX = 1000;
     private static long ST_TX_1 = 0;
     private static long CT_TX_1 = 1;
@@ -55,6 +53,8 @@ public class TestRetryProcessor {
     private ReplyProcessor replyProc;
     @Mock
     private Panicker panicker;
+    @Mock
+    private MetricsRegistry metrics;
 
     private CommitTable commitTable;
 
@@ -63,7 +63,6 @@ public class TestRetryProcessor {
         MockitoAnnotations.initMocks(this);
         // Init components
         commitTable = new InMemoryCommitTable();
-        metrics = new NullMetricsProvider();
     }
 
     @Test(timeOut = 10_000)
@@ -77,8 +76,7 @@ public class TestRetryProcessor {
         retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(replyProc, timeout(100).times(1))
-                .addAbort(any(Batch.class), firstTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
         long startTS = firstTSCapture.getValue();
         assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
     }
@@ -96,16 +94,15 @@ public class TestRetryProcessor {
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(replyProc, timeout(100).times(1)).addCommit(any(Batch.class),
-                                                           firstTSCapture.capture(),
-                                                           secondTSCapture.capture(),
-                                                           any(Channel.class),
-                                                           any(MonitoringContext.class));
+        verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
+                                                                    secondTSCapture.capture(),
+                                                                    any(Channel.class));
 
         long startTS = firstTSCapture.getValue();
         long commitTS = secondTSCapture.getValue();
         assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
         assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");
+
     }
 
     @Test(timeOut = 10_000)
@@ -125,12 +122,10 @@ public class TestRetryProcessor {
         // The element to test
         RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
 
-        // Test we'll reply with an abort for a retry request when the
-        // transaction id IS in the commit table BUT invalidated
+        // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
-        verify(replyProc, timeout(100).times(1))
-                .addAbort(any(Batch.class), startTSCapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
         long startTS = startTSCapture.getValue();
         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");