You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/25 19:22:10 UTC

git commit: start pool

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-notifications-queue 8a8f7e6b0 -> 5b3948b0d


start pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5b3948b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5b3948b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5b3948b0

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 5b3948b0d4e8ee5a1c3f50df23ec4f1007dc6fc0
Parents: 8a8f7e6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Aug 25 11:21:46 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Aug 25 11:21:46 2014 -0600

----------------------------------------------------------------------
 .../services/notifications/QueueListener.java   | 28 +++++++++++++++-----
 1 file changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5b3948b0/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index a1ab0db..d24aa29 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,6 +35,7 @@ import javax.annotation.PostConstruct;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @Component( "notificationsQueueListener" )
@@ -57,18 +58,26 @@ public class QueueListener  {
     private ServiceManager svcMgr;
     ExecutorService pool;
     public QueueListener() {
-      //  pool = Executors.newFixedThreadPool(1);
+        pool = Executors.newFixedThreadPool(1);
     }
 
     @PostConstruct
     void init() {
-        svcMgr = smf.getServiceManager(smf.getManagementAppId());
-        queueManager = svcMgr.getQueueManager();
-      //  run();
+        pool.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    execute();
+                }catch (Exception e){
+                    LOG.error("failed to start push",e);
+                }
+            }
+        });
     }
 
-    public void run(){
-
+    private void execute(){
+        svcMgr = smf.getServiceManager(smf.getManagementAppId());
+        queueManager = svcMgr.getQueueManager();
         AtomicInteger consecutiveExceptions = new AtomicInteger();
         // run until there are no more active jobs
         while ( true ) {
@@ -110,8 +119,13 @@ public class QueueListener  {
                         first = Observable.merge(first, o);
                     }
                 }
-                first.toBlocking().lastOrDefault(null);
+                if(first!=null) {
+                    first.toBlocking().lastOrDefault(null);
+                }
                 consecutiveExceptions.set(0);
+                if(messages.size()<=0) {
+                    Thread.sleep(5000);
+                }
             }catch (Exception ex){
                 LOG.error("failed to dequeue",ex);
                 if(consecutiveExceptions.getAndIncrement() > MAX_CONSECUTIVE_FAILS){