You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:14 UTC
[24/50] [abbrv] usergrid git commit: add configurable buffer size
add configurable buffer size
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d57b4fe2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d57b4fe2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d57b4fe2
Branch: refs/heads/asf-site
Commit: d57b4fe2cc668f2b91bc26a83b216c846890c6b9
Parents: f9f0825
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 7 16:43:22 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 7 16:43:22 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 4 ++--
.../apache/usergrid/corepersistence/index/IndexProcessorFig.java | 4 ++++
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d57b4fe2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a9e2459..957ee68 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -307,7 +307,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
//filter for success, send to the index(optional), ack
return masterObservable
//take the max
- .buffer(250, TimeUnit.MILLISECONDS, bufferSize)
+ .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize)
//map them to index results and return them
.flatMap(indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
@@ -565,7 +565,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
{
final int bufferSize = messages.size();
return handleMessages(messages)
- .buffer(100, TimeUnit.MILLISECONDS, bufferSize) //TODO how to ack multiple messages via buffer
+ .buffer(indexProcessorFig.getBufferTime(), TimeUnit.MILLISECONDS, bufferSize) //TODO how to ack multiple messages via buffer
.doOnNext(messagesToAck -> {
if (messagesToAck.size() == 0) {
return;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d57b4fe2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 2ffa374..410f162 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -90,6 +90,10 @@ public interface IndexProcessorFig extends GuicyFig {
@Key("elasticsearch.reindex.flush.interval")
int getUpdateInterval();
+ @Default("100")
+ @Key("elasticsearch.buffer.time_ms")
+ int getBufferTime();
+
@Default("1000")
@Key( REINDEX_BUFFER_SIZE )
int getReindexBufferSize();