You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/29 20:21:47 UTC

[06/22] git commit: add config for inactive dev retrieval

add config for inactive dev retrieval


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f3343f1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f3343f1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f3343f1f

Branch: refs/heads/two-dot-o-events
Commit: f3343f1f8561ddc36c632e665ef8706163bbbcd0
Parents: 9907863
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 15:33:49 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 15:33:49 2014 -0600

----------------------------------------------------------------------
 .../services/notifications/QueueListener.java       | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f3343f1f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 48c110b..286daf1 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -64,7 +64,7 @@ public class QueueListener  {
 
     private long sleepWhenNoneFound = 0;
 
-    private long sleepBetweenRuns = 5000;
+    private long sleepBetweenRuns = 0;
 
     private ExecutorService pool;
     private List<Future> futures;
@@ -73,6 +73,7 @@ public class QueueListener  {
     private Integer batchSize = 10;
     private String queueName;
     public QueueManager TEST_QUEUE_MANAGER;
+    private int consecutiveCallsToRemoveDevices;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
         this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
@@ -93,12 +94,14 @@ public class QueueListener  {
             int threadCount = 0;
 
             try {
-                sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
+                sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
                 queueName = ApplicationQueueManager.getQueueNames(properties);
 
                 int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
+                consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+100));
+
                 futures = new ArrayList<Future>(maxThreads);
 
                 //create our thread pool based on our threadcount.
@@ -154,7 +157,6 @@ public class QueueListener  {
                 LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
                 if (messages.size() > 0) {
-                    runCount++;
                     HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
                     for (QueueMessage message : messages) {
@@ -207,9 +209,13 @@ public class QueueListener  {
                         LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
                         Thread.sleep(sleepBetweenRuns);
                     }
-                    if(runCount % 100 == 0){
+                    if(runCount++ % consecutiveCallsToRemoveDevices == 0){
                         for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
-                            applicationQueueManager.asyncCheckForInactiveDevices();
+                            try {
+                                applicationQueueManager.asyncCheckForInactiveDevices();
+                            }catch (Exception inactiveDeviceException){
+                                LOG.error("Inactive Device Get failed",inactiveDeviceException);
+                            }
                         }
                         //clear everything
                         queueManagerMap.clear();