You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/30 04:39:37 UTC
[11/45] git commit: add config for inactive dev retrieval
add config for inactive dev retrieval
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f3343f1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f3343f1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f3343f1f
Branch: refs/heads/key-row-sharding
Commit: f3343f1f8561ddc36c632e665ef8706163bbbcd0
Parents: 9907863
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 27 15:33:49 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 27 15:33:49 2014 -0600
----------------------------------------------------------------------
.../services/notifications/QueueListener.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f3343f1f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 48c110b..286daf1 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -64,7 +64,7 @@ public class QueueListener {
private long sleepWhenNoneFound = 0;
- private long sleepBetweenRuns = 5000;
+ private long sleepBetweenRuns = 0;
private ExecutorService pool;
private List<Future> futures;
@@ -73,6 +73,7 @@ public class QueueListener {
private Integer batchSize = 10;
private String queueName;
public QueueManager TEST_QUEUE_MANAGER;
+ private int consecutiveCallsToRemoveDevices;
public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
@@ -93,12 +94,14 @@ public class QueueListener {
int threadCount = 0;
try {
- sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
+ sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
queueName = ApplicationQueueManager.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
+ consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+100));
+
futures = new ArrayList<Future>(maxThreads);
//create our thread pool based on our threadcount.
@@ -154,7 +157,6 @@ public class QueueListener {
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
- runCount++;
HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (QueueMessage message : messages) {
@@ -207,9 +209,13 @@ public class QueueListener {
LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
Thread.sleep(sleepBetweenRuns);
}
- if(runCount % 100 == 0){
+ if(runCount++ % consecutiveCallsToRemoveDevices == 0){
for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
- applicationQueueManager.asyncCheckForInactiveDevices();
+ try {
+ applicationQueueManager.asyncCheckForInactiveDevices();
+ }catch (Exception inactiveDeviceException){
+ LOG.error("Inactive Device Get failed",inactiveDeviceException);
+ }
}
//clear everything
queueManagerMap.clear();