You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:41 UTC
[34/50] [abbrv] incubator-omid git commit: Move from Batch to
Persistence proc. the responsiblity of adding sendReplies to Reply proc.
Move from Batch to Persistence proc. the responsiblity of adding sendReplies to Reply proc.
Change-Id: I754f7189a166420652fcfec4fa4c1497212f8d7c
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/aa2651a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/aa2651a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/aa2651a0
Branch: refs/heads/master
Commit: aa2651a0a866e60e49504d6b9b6d4b47062a1c5a
Parents: 1d60f21
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Wed May 4 10:04:28 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 4 10:04:28 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/omid/tso/Batch.java | 38 ++++++-------------
.../omid/tso/PersistenceProcessorHandler.java | 40 ++++++++++++++++----
.../java/org/apache/omid/tso/TestBatch.java | 8 ++--
3 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 2b17f23..b3b9eef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -91,6 +91,18 @@ public class Batch {
}
+ PersistEvent get(int idx) {
+ return events[idx];
+ }
+
+ void set(int idx, PersistEvent event) {
+ events[idx] = event;
+ }
+
+ void decreaseNumEvents() {
+ numEvents--;
+ }
+
void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
Preconditions.checkState(!isFull(), "batch is full");
int index = numEvents++;
@@ -123,32 +135,6 @@ public class Batch {
}
- void sendReply(ReplyProcessor reply, RetryProcessor retryProc, long batchID) {
-
- int i = 0;
- while (i < numEvents) {
- PersistEvent e = events[i];
- if (e.getType() == Type.ABORT && e.isRetry()) {
- retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
- PersistEvent tmp = events[i];
- //TODO: why assign it?
- events[i] = events[numEvents - 1];
- events[numEvents - 1] = tmp;
- if (numEvents == 1) {
- clear();
- reply.manageResponsesBatch(batchID, null);
- return;
- }
- numEvents--;
- continue;
- }
- i++;
- }
-
- reply.manageResponsesBatch(batchID, this);
-
- }
-
@Override
public String toString() {
return Objects.toStringHelper(this)
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 84890b9..099cf88 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.omid.tso;
-import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.Histogram;
@@ -94,25 +93,24 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
}
}
- flush(batch, event.getBatchSequence());
-
+ if (batch.getNumEvents() > 0) {
+ flush(batch.getNumEvents());
+ sendReplies(batch, event.getBatchSequence());
+ }
}
- private void flush(Batch batch, long batchSequence) {
+ private void flush(int numBatchedEvents) {
- if (batch.getNumEvents() > 0) {
commitSuicideIfNotMaster();
try {
long startFlushTimeInNs = System.nanoTime();
writer.flush();
flushTimer.update(System.nanoTime() - startFlushTimeInNs);
- batchSizeHistogram.update(batch.getNumEvents());
+ batchSizeHistogram.update(numBatchedEvents);
} catch (IOException e) {
panicker.panic("Error persisting commit batch", e);
}
commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
- batch.sendReply(replyProcessor, retryProc, batchSequence);
- }
}
@@ -122,4 +120,30 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
}
}
+ private void sendReplies(Batch batch, long batchSequence) {
+
+ int i = 0;
+ while (i < batch.getNumEvents()) {
+ PersistEvent e = batch.get(i);
+ if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) {
+ retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
+ PersistEvent tmp = batch.get(i);
+ //TODO: why assign it?
+ batch.set(i, batch.get(batch.getNumEvents() - 1));
+ batch.set(batch.getNumEvents() - 1, tmp);
+ if (batch.getNumEvents() == 1) {
+ batch.clear();
+ replyProcessor.manageResponsesBatch(batchSequence, null);
+ return;
+ }
+ batch.decreaseNumEvents();
+ continue;
+ }
+ i++;
+ }
+
+ replyProcessor.manageResponsesBatch(batchSequence, batch);
+
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c003f34..2b8b318 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -110,8 +110,8 @@ public class TestBatch {
// assertFalse(batch.isFull(), "Batch shouldn't be full");
// assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
//=======
- batch.sendReply(replyProcessor, retryProcessor, (-1));
- verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
+// batch.sendReply(replyProcessor, retryProcessor, (-1));
+ //verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
assertTrue(batch.isFull(), "Batch shouldn't be empty");
}
@@ -135,8 +135,8 @@ public class TestBatch {
// Test that sending replies empties the batch also when the replica is NOT master and calls the
// ambiguousCommitResponse() method on the reply processor
- batch.sendReply(replyProcessor, retryProcessor, (-1));
- verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
+ //batch.sendReply(replyProcessor, retryProcessor, (-1));
+ //verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch);
assertTrue(batch.isFull(), "Batch should be full");
}