You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/09 21:38:09 UTC

[1/2] incubator-usergrid git commit: change timer for buffer consumer

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-461 [created] 0555622f5


change timer for buffer consumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5cc521f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5cc521f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5cc521f4

Branch: refs/heads/USERGRID-461
Commit: 5cc521f472699633a8a8cb3006c79ab5da15f632
Parents: d901d38
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 12:46:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 12:46:48 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EsIndexBufferConsumerImpl.java       | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5cc521f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index d952d78..efde88f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -88,18 +88,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
                         do {
                             try {
-                                Timer.Context timer = produceTimer.time();
                                 IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
                                 if(polled!=null) {
+                                    Timer.Context timer = produceTimer.time();
                                     drainList.add(polled);
                                     producerQueue.drainTo(drainList, config.getIndexBufferSize());
                                     for(IndexOperationMessage drained : drainList){
                                         subscriber.onNext(drained);
                                     }
                                     drainList.clear();
+                                    timer.stop();
                                 }
-                                timer.stop();
-
                             } catch (InterruptedException ie) {
                                 log.error("failed to dequeue", ie);
                             }


[2/2] incubator-usergrid git commit: change timer for buffer consumer

Posted by sf...@apache.org.
change timer for buffer consumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0555622f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0555622f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0555622f

Branch: refs/heads/USERGRID-461
Commit: 0555622f586b389fc0ee2aecf217f90a8ee16584
Parents: 5cc521f
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 13:00:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 13:00:35 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EsIndexBufferConsumerImpl.java | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0555622f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index efde88f..94ea71e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -50,6 +50,7 @@ import rx.schedulers.Schedulers;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Consumer for IndexOperationMessages
@@ -78,6 +79,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
         final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
 
+
+        final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
         this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
             @Override
@@ -99,8 +102,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                     drainList.clear();
                                     timer.stop();
                                 }
+                                countFail.set(0);
                             } catch (InterruptedException ie) {
+                                int count = countFail.incrementAndGet();
                                 log.error("failed to dequeue", ie);
+                                if(count > 200){
+                                    log.error("Shutting down index drain due to repetitive failures");
+                                    //break;
+                                }
+
                             }
                         } while (true);
                     }