You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/12 23:41:41 UTC

[34/38] incubator-usergrid git commit: change timer for buffer consumer

change timer for buffer consumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0555622f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0555622f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0555622f

Branch: refs/heads/USERGRID-396
Commit: 0555622f586b389fc0ee2aecf217f90a8ee16584
Parents: 5cc521f
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 13:00:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 13:00:35 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EsIndexBufferConsumerImpl.java | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0555622f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index efde88f..94ea71e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -50,6 +50,7 @@ import rx.schedulers.Schedulers;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Consumer for IndexOperationMessages
@@ -78,6 +79,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
         final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
 
+
+        final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
         this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
             @Override
@@ -99,8 +102,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                     drainList.clear();
                                     timer.stop();
                                 }
+                                countFail.set(0);
                             } catch (InterruptedException ie) {
+                                int count = countFail.incrementAndGet();
                                 log.error("failed to dequeue", ie);
+                                if(count > 200){
+                                    log.error("Shutting down index drain due to repetitive failures");
+                                    //break;
+                                }
+
                             }
                         } while (true);
                     }