You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/09 21:38:09 UTC
[1/2] incubator-usergrid git commit: change timer for buffer consumer
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-461 [created] 0555622f5
change timer for buffer consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5cc521f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5cc521f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5cc521f4
Branch: refs/heads/USERGRID-461
Commit: 5cc521f472699633a8a8cb3006c79ab5da15f632
Parents: d901d38
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 12:46:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 12:46:48 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5cc521f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index d952d78..efde88f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -88,18 +88,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
do {
try {
- Timer.Context timer = produceTimer.time();
IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
if(polled!=null) {
+ Timer.Context timer = produceTimer.time();
drainList.add(polled);
producerQueue.drainTo(drainList, config.getIndexBufferSize());
for(IndexOperationMessage drained : drainList){
subscriber.onNext(drained);
}
drainList.clear();
+ timer.stop();
}
- timer.stop();
-
} catch (InterruptedException ie) {
log.error("failed to dequeue", ie);
}
[2/2] incubator-usergrid git commit: change timer for buffer consumer
Posted by sf...@apache.org.
change timer for buffer consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0555622f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0555622f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0555622f
Branch: refs/heads/USERGRID-461
Commit: 0555622f586b389fc0ee2aecf217f90a8ee16584
Parents: 5cc521f
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 13:00:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 13:00:35 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0555622f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index efde88f..94ea71e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -50,6 +50,7 @@ import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Consumer for IndexOperationMessages
@@ -78,6 +79,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
+
+ final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
@@ -99,8 +102,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
drainList.clear();
timer.stop();
}
+ countFail.set(0);
} catch (InterruptedException ie) {
+ int count = countFail.incrementAndGet();
log.error("failed to dequeue", ie);
+ if(count > 200){
+ log.error("Shutting down index drain due to repetitive failures");
+ //break;
+ }
+
}
} while (true);
}