You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:14 UTC

[24/50] [abbrv] usergrid git commit: add configurable buffer size

add configurable buffer size


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d57b4fe2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d57b4fe2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d57b4fe2

Branch: refs/heads/asf-site
Commit: d57b4fe2cc668f2b91bc26a83b216c846890c6b9
Parents: f9f0825
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 7 16:43:22 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 7 16:43:22 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java     | 4 ++--
 .../apache/usergrid/corepersistence/index/IndexProcessorFig.java | 4 ++++
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d57b4fe2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a9e2459..957ee68 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -307,7 +307,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         //filter for success, send to the index(optional), ack
         return masterObservable
             //take the max
-            .buffer(250, TimeUnit.MILLISECONDS, bufferSize)
+            .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize)
             //map them to index results and return them
             .flatMap(indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
@@ -565,7 +565,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                 {
                                     final int bufferSize = messages.size();
                                     return handleMessages(messages)
-                                        .buffer(100, TimeUnit.MILLISECONDS, bufferSize) //TODO how to ack multiple messages via buffer
+                                        .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize) //TODO how to ack multiple messages via buffer
                                         .doOnNext(messagesToAck -> {
                                             if (messagesToAck.size() == 0) {
                                                 return;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d57b4fe2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 2ffa374..410f162 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -90,6 +90,10 @@ public interface IndexProcessorFig extends GuicyFig {
     @Key("elasticsearch.reindex.flush.interval")
     int getUpdateInterval();
 
+    @Default("100")
+    @Key("elasticsearch.buffer.time_ms")
+    int getBufferTime();
+
     @Default("1000")
     @Key( REINDEX_BUFFER_SIZE )
     int getReindexBufferSize();