You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/25 19:22:10 UTC
git commit: start pool
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-notifications-queue 8a8f7e6b0 -> 5b3948b0d
start pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5b3948b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5b3948b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5b3948b0
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 5b3948b0d4e8ee5a1c3f50df23ec4f1007dc6fc0
Parents: 8a8f7e6
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Aug 25 11:21:46 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Aug 25 11:21:46 2014 -0600
----------------------------------------------------------------------
.../services/notifications/QueueListener.java | 28 +++++++++++++++-----
1 file changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5b3948b0/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index a1ab0db..d24aa29 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,6 +35,7 @@ import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@Component( "notificationsQueueListener" )
@@ -57,18 +58,26 @@ public class QueueListener {
private ServiceManager svcMgr;
ExecutorService pool;
public QueueListener() {
- // pool = Executors.newFixedThreadPool(1);
+ pool = Executors.newFixedThreadPool(1);
}
@PostConstruct
void init() {
- svcMgr = smf.getServiceManager(smf.getManagementAppId());
- queueManager = svcMgr.getQueueManager();
- // run();
+ pool.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ execute();
+ }catch (Exception e){
+ LOG.error("failed to start push",e);
+ }
+ }
+ });
}
- public void run(){
-
+ private void execute(){
+ svcMgr = smf.getServiceManager(smf.getManagementAppId());
+ queueManager = svcMgr.getQueueManager();
AtomicInteger consecutiveExceptions = new AtomicInteger();
// run until there are no more active jobs
while ( true ) {
@@ -110,8 +119,13 @@ public class QueueListener {
first = Observable.merge(first, o);
}
}
- first.toBlocking().lastOrDefault(null);
+ if(first!=null) {
+ first.toBlocking().lastOrDefault(null);
+ }
consecutiveExceptions.set(0);
+ if(messages.size()<=0) {
+ Thread.sleep(5000);
+ }
}catch (Exception ex){
LOG.error("failed to dequeue",ex);
if(consecutiveExceptions.getAndIncrement() > MAX_CONSECUTIVE_FAILS){