You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:12 UTC
[22/50] [abbrv] usergrid git commit: buffer size fix
buffer size fix
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f9f08253
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f9f08253
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f9f08253
Branch: refs/heads/asf-site
Commit: f9f0825315c22ecfa1375189bf70020cde868661
Parents: 79caa09
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 7 15:14:22 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 7 15:14:22 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f9f08253/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a5342ea..a9e2459 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -255,6 +255,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
logger.debug("handleMessages with {} message", messages.size());
}
+ final int bufferSize = messages.size();
Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
AsyncEvent event = null;
try{
@@ -305,10 +306,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
//filter for success, send to the index(optional), ack
return masterObservable
- //remove unsuccessful
- .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
//take the max
- .buffer( MAX_TAKE )
+ .buffer(250, TimeUnit.MILLISECONDS, bufferSize)
//map them to index results and return them
.flatMap(indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
@@ -322,8 +321,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
//ack after successful completion of the operation.
return indexProducer.put(combined)
+ //change observable type
.flatMap(indexOperationMessage -> Observable.from(indexEventResults))
+ //remove unsuccessful
+ .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
+ //measure
.doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
+ //return the queue messages to ack
.map(result -> result.getQueueMessage().get());
});