You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:12 UTC

[22/50] [abbrv] usergrid git commit: buffer size fix

buffer size fix


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f9f08253
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f9f08253
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f9f08253

Branch: refs/heads/asf-site
Commit: f9f0825315c22ecfa1375189bf70020cde868661
Parents: 79caa09
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 7 15:14:22 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 7 15:14:22 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java              | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f9f08253/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a5342ea..a9e2459 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -255,6 +255,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
             logger.debug("handleMessages with {} message", messages.size());
         }
 
+        final int bufferSize = messages.size();
         Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message -> {
             AsyncEvent event = null;
             try{
@@ -305,10 +306,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         //filter for success, send to the index(optional), ack
         return masterObservable
-            //remove unsuccessful
-            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
             //take the max
-            .buffer( MAX_TAKE )
+            .buffer(250, TimeUnit.MILLISECONDS, bufferSize)
             //map them to index results and return them
             .flatMap(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
@@ -322,8 +321,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
                 //ack after successful completion of the operation.
                 return indexProducer.put(combined)
+                    //change observable type
                     .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
+                        //remove unsuccessful
+                    .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
+                    //measure
                     .doOnNext(indexEventResult -> messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()))
+                    //return the queue messages to ack
                     .map(result -> result.getQueueMessage().get());
 
             });