You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/09 21:38:10 UTC
[2/2] incubator-usergrid git commit: change timer for buffer consumer
change timer for buffer consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0555622f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0555622f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0555622f
Branch: refs/heads/USERGRID-461
Commit: 0555622f586b389fc0ee2aecf217f90a8ee16584
Parents: 5cc521f
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 13:00:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 13:00:35 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0555622f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index efde88f..94ea71e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -50,6 +50,7 @@ import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Consumer for IndexOperationMessages
@@ -78,6 +79,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
+
+ final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
@@ -99,8 +102,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
drainList.clear();
timer.stop();
}
+ countFail.set(0);
} catch (InterruptedException ie) {
+ int count = countFail.incrementAndGet();
log.error("failed to dequeue", ie);
+ if(count > 200){
+ log.error("Shutting down index drain due to repetitive failures");
+ //break;
+ }
+
}
} while (true);
}