You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/26 19:26:55 UTC
[1/2] git commit: pick random queue between 0 and queue size
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o 4cba3b830 -> 05caf4932
pick random queue between 0 and queue size
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f2a93d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f2a93d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f2a93d7f
Branch: refs/heads/two-dot-o
Commit: f2a93d7f3283a148b2091821f2644506313def8f
Parents: 4cba3b8
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 26 11:02:25 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 26 11:02:25 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 27 +++++++++++---------
.../services/notifications/QueueListener.java | 15 +++++------
.../services/notifications/QueueManager.java | 4 ---
.../notifications/SingleQueueTaskManager.java | 8 +++---
4 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index d110b95..198d41a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -18,15 +18,9 @@ package org.apache.usergrid.services.notifications;
import com.clearspring.analytics.hash.MurmurHash;
import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.QueueQuery;
-import org.apache.usergrid.mq.QueueResults;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
@@ -66,7 +60,7 @@ public class ApplicationQueueManager implements QueueManager {
private final org.apache.usergrid.mq.QueueManager qm;
private final JobScheduler jobScheduler;
private final MetricsFactory metricsFactory;
- private final String queueName;
+ private final String[] queueNames;
HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
@@ -87,7 +81,7 @@ public class ApplicationQueueManager implements QueueManager {
this.qm = queueManager;
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
- this.queueName = properties.getProperty(DEFAULT_QUEUE_PROPERTY, DEFAULT_QUEUE_NAME);
+ this.queueNames = getQueueNames(properties);
}
@@ -115,6 +109,7 @@ public class ApplicationQueueManager implements QueueManager {
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
final HashMap<Object,Notifier> notifierMap = getNotifierMap();
+ final String queueName = getRandomQueue(queueNames);
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
@@ -235,7 +230,7 @@ public class ApplicationQueueManager implements QueueManager {
//do i have devices, and have i already started batching.
if (deviceCount.get() <= 0) {
- SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this, notification);
+ SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this, notification,queueName);
//if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
taskManager.finishedBatch();
}
@@ -288,7 +283,7 @@ public class ApplicationQueueManager implements QueueManager {
* @param messages
* @throws Exception
*/
- public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages) {
+ public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
LOG.info("sending batch of {} notifications.", messages.size());
final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
@@ -313,7 +308,7 @@ public class ApplicationQueueManager implements QueueManager {
SingleQueueTaskManager taskManager;
taskManager = taskMap.get(message.getNotificationId());
if (taskManager == null) {
- taskManager = new SingleQueueTaskManager(em, qm, proxy, notification);
+ taskManager = new SingleQueueTaskManager(em, qm, proxy, notification,queuePath);
taskMap.putIfAbsent(message.getNotificationId(), taskManager);
taskManager = taskMap.get(message.getNotificationId());
}
@@ -427,6 +422,15 @@ public class ApplicationQueueManager implements QueueManager {
return translatedPayloads;
}
+ public static String[] getQueueNames(Properties properties) {
+ String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
+ return names;
+ }
+ public static String getRandomQueue(String[] queueNames) {
+ int size = queueNames.length;
+ Random random = new Random();
+ return queueNames[random.nextInt(size)];
+ }
private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
private final Iterator<T> input;
@@ -571,6 +575,5 @@ public class ApplicationQueueManager implements QueueManager {
}
}
- public String getQueuePath(){return queueName;}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 6e9a7ef..1d4c5cb 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,8 +35,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
public class QueueListener {
- public static int MAX_CONSECUTIVE_FAILS = 10000;
-
public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@@ -66,7 +64,7 @@ public class QueueListener {
public static final String MAX_THREADS = "1";
private Integer batchSize = 1000;
- private String queueName;
+ private String[] queueNames;
public QueueListener() {
pool = Executors.newFixedThreadPool(1);
@@ -95,7 +93,7 @@ public class QueueListener {
sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", "5000")).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
- queueName = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
+ queueNames = ApplicationQueueManager.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
futures = new ArrayList<Future>(maxThreads);
@@ -133,7 +131,8 @@ public class QueueListener {
// run until there are no more active jobs
while ( true ) {
try {
- QueueResults results = getDeliveryBatch(queueManager);
+ String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
+ QueueResults results = getDeliveryBatch(queueManager,queueName);
LOG.info("QueueListener: retrieved batch of {} messages", results.size());
List<Message> messages = results.getMessages();
@@ -167,7 +166,7 @@ public class QueueListener {
);
LOG.info("QueueListener: send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
- Observable current = manager.sendBatchToProviders(entry.getValue());
+ Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
if(merge == null)
merge = current;
else {
@@ -211,11 +210,11 @@ public class QueueListener {
}
}
- private QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
+ private QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String queuePath) throws Exception {
QueueQuery qq = new QueueQuery();
qq.setLimit(this.getBatchSize());
qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
- QueueResults results = queueManager.getFromQueue(queueName, qq);
+ QueueResults results = queueManager.getFromQueue(queuePath, qq);
LOG.debug("got batch of {} devices", results.size());
return results;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
index f92d463..0024417 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
@@ -26,10 +26,6 @@ import java.util.Set;
*/
public interface QueueManager {
- public HashMap<Object,Notifier> getNotifierMap();
-
public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
- public String getQueuePath();
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
index 8b4866f..f87f497 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -34,8 +34,8 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
private static final Logger LOG = LoggerFactory
.getLogger(SingleQueueTaskManager.class);
- private final String path;
private final QueueManager proxy;
+ private final String queuePath;
private Notification notification;
private AtomicLong successes = new AtomicLong();
@@ -45,14 +45,14 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
private boolean hasFinished;
- public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification) {
+ public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification,String queuePath) {
this.em = em;
this.qm = qm;
- this.path = proxy.getQueuePath();
this.notification = notification;
this.proxy = proxy;
this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
hasFinished = false;
+ this.queuePath = queuePath;
}
public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
@@ -72,7 +72,7 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
}
LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
- qm.commitTransaction(path, messageMap.get(deviceUUID).getTransaction(), null);
+ qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
if (newProviderId != null) {
LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
replaceProviderId(deviceRef, notifier, newProviderId);
[2/2] git commit: setup default sleep window
Posted by sf...@apache.org.
setup default sleep window
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/05caf493
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/05caf493
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/05caf493
Branch: refs/heads/two-dot-o
Commit: 05caf49325e52f806b3fdc22f18b70512f722d8c
Parents: f2a93d7
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 26 11:23:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 26 11:23:19 2014 -0600
----------------------------------------------------------------------
.../services/notifications/ApplicationQueueManager.java | 5 +++--
.../apache/usergrid/services/notifications/QueueListener.java | 3 ++-
.../services/notifications/apns/NotificationsServiceIT.java | 4 +++-
.../services/notifications/gcm/NotificationsServiceIT.java | 4 +++-
4 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 198d41a..766cb75 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class ApplicationQueueManager implements QueueManager {
- public static String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_12";
+ public static String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_20;notifications/queuelistenerv1_21";
public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
@@ -429,7 +429,8 @@ public class ApplicationQueueManager implements QueueManager {
public static String getRandomQueue(String[] queueNames) {
int size = queueNames.length;
Random random = new Random();
- return queueNames[random.nextInt(size)];
+ String name = queueNames[random.nextInt(size)];
+ return name;
}
private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 1d4c5cb..234821b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class QueueListener {
public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
+ public static long DEFAULT_SLEEP = 5000;
private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@@ -91,7 +92,7 @@ public class QueueListener {
try {
sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
- sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", "5000")).longValue();
+ sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
queueNames = ApplicationQueueManager.getQueueNames(properties);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index ec7a14f..a47dc99 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -69,6 +69,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@Override
@Before
public void before() throws Exception {
+ ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+
+ QueueListener.DEFAULT_SLEEP = 200;
super.before();
// create apns notifier //
app.clear();
@@ -128,7 +131,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
ns.TEST_PATH_QUERY = pathQuery;
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
listener.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 09814f3..eb3c8b8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -63,6 +63,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@Override
@Before
public void before() throws Exception {
+ ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+ QueueListener.DEFAULT_SLEEP = 200;
+
super.before();
// create gcm notifier //
@@ -102,7 +105,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
PathQuery pathQuery = new PathQuery(new SimpleEntityRef( app.getEm().getApplicationRef()), query);
ns.TEST_PATH_QUERY = pathQuery;
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
listener = new QueueListener(ns.getServiceManagerFactory(),
ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
listener.run();