You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/20 20:22:37 UTC
[4/4] git commit: adding properties for queue names and batch size
adding properties for queue names and batch size
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7bcdf10b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7bcdf10b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7bcdf10b
Branch: refs/heads/two-dot-o
Commit: 7bcdf10b6147329d1eabb9d338783351709f7967
Parents: ee33dad
Author: Shawn Feldman <sf...@apache.org>
Authored: Sat Sep 20 12:05:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Sat Sep 20 12:05:02 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 27 +++++--------
.../services/notifications/QueueListener.java | 42 +++++++++++++-------
.../services/notifications/QueueManager.java | 2 +
.../notifications/SingleQueueTaskManager.java | 2 +-
.../apns/NotificationsServiceIT.java | 8 ++--
.../gcm/NotificationsServiceIT.java | 2 +-
6 files changed, 47 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 29e3101..b54c9c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -52,10 +52,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* Created by ApigeeCorporation on 8/27/14.
*/
public class ApplicationQueueManager implements QueueManager {
- public static String QUEUE_NAME = "notifications/queuelistenerv1_11";
- public static int BATCH_SIZE = 1000;
- public static final long MESSAGE_TRANSACTION_TIMEOUT = 5 * 1000;
+ public static String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_12";
+ public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
//this is for tests, will not mark initial post complete, set to false for tests
@@ -67,7 +66,8 @@ public class ApplicationQueueManager implements QueueManager {
private final org.apache.usergrid.mq.QueueManager qm;
private final JobScheduler jobScheduler;
private final MetricsFactory metricsFactory;
- private final Properties properties;
+ private final String queueName;
+
HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
public final Map<String, ProviderAdapter> providerAdapters = new HashMap<String, ProviderAdapter>(3);
@@ -82,22 +82,15 @@ public class ApplicationQueueManager implements QueueManager {
public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
- public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory,Properties properties){
+ public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
this.em = entityManager;
this.qm = queueManager;
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
- this.properties = properties;
+ this.queueName = properties.getProperty(DEFAULT_QUEUE_PROPERTY, DEFAULT_QUEUE_NAME);
}
- public static QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
- QueueQuery qq = new QueueQuery();
- qq.setLimit(BATCH_SIZE);
- qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
- QueueResults results = queueManager.getFromQueue(QUEUE_NAME, qq);
- LOG.debug("got batch of {} devices", results.size());
- return results;
- }
+
public boolean scheduleQueueJob(Notification notification) throws Exception{
return jobScheduler.scheduleQueueJob(notification);
@@ -187,7 +180,7 @@ public class ApplicationQueueManager implements QueueManager {
LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
}
now = System.currentTimeMillis();
- qm.postToQueue(QUEUE_NAME, message);
+ qm.postToQueue(queueName, message);
LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms", notification.getUuid(), deviceRef.getUuid());
deviceCount.incrementAndGet();
queueMeter.mark();
@@ -373,7 +366,7 @@ public class ApplicationQueueManager implements QueueManager {
return messageObservable.map(func);
}
}, Schedulers.io())
- .buffer(BATCH_SIZE)
+ .buffer(messages.size())
.map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
@Override
public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
@@ -579,4 +572,6 @@ public class ApplicationQueueManager implements QueueManager {
}
}
+ public String getQueuePath(){return queueName;}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 6821681..7a4cd26 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -20,17 +20,13 @@ import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.mq.*;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
+
import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.GroupedObservable;
-import rx.schedulers.Schedulers;
-
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutorService;
@@ -41,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class QueueListener {
public static int MAX_CONSECUTIVE_FAILS = 10;
+ public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@@ -68,6 +65,8 @@ public class QueueListener {
List<Future> futures;
public static final String MAX_THREADS = "1";
+ private Integer batchSize = 1000;
+ private String queueName;
public QueueListener() {
pool = Executors.newFixedThreadPool(1);
@@ -95,6 +94,9 @@ public class QueueListener {
try {
sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", "5000")).longValue();
+ batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
+ queueName = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
+
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
futures = new ArrayList<Future>(maxThreads);
while (threadCount++ < maxThreads) {
@@ -132,28 +134,28 @@ public class QueueListener {
// run until there are no more active jobs
while ( true ) {
try {
- QueueResults results = ApplicationQueueManager.getDeliveryBatch(queueManager);
- LOG.info("QueueListener: retrieved batch of {} messages",results.size());
+ QueueResults results = getDeliveryBatch(queueManager);
+ LOG.info("QueueListener: retrieved batch of {} messages", results.size());
List<Message> messages = results.getMessages();
- if(messages.size()>0) {
- HashMap<UUID,List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
+ if (messages.size() > 0) {
+ HashMap<UUID, List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
- for(Message message : messages){
+ for (Message message : messages) {
ApplicationQueueMessage queueMessage = ApplicationQueueMessage.generate(message);
UUID applicationId = queueMessage.getApplicationId();
- if(!messageMap.containsKey(applicationId)){
+ if (!messageMap.containsKey(applicationId)) {
List<ApplicationQueueMessage> applicationQueueMessages = new ArrayList<ApplicationQueueMessage>();
applicationQueueMessages.add(queueMessage);
- messageMap.put(applicationId,applicationQueueMessages);
- }else{
+ messageMap.put(applicationId, applicationQueueMessages);
+ } else {
messageMap.get(applicationId).add(queueMessage);
}
}
long now = System.currentTimeMillis();
Observable merge = null;
//send each set of app ids together
- for(Map.Entry<UUID,List<ApplicationQueueMessage>> entry : messageMap.entrySet()){
+ for (Map.Entry<UUID, List<ApplicationQueueMessage>> entry : messageMap.entrySet()) {
UUID applicationId = entry.getKey();
EntityManager entityManager = emf.getEntityManager(applicationId);
ServiceManager serviceManager = smf.getServiceManager(applicationId);
@@ -206,6 +208,18 @@ public class QueueListener {
}
}
+ private QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
+ QueueQuery qq = new QueueQuery();
+ qq.setLimit(this.getBatchSize());
+ qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
+ QueueResults results = queueManager.getFromQueue(queueName, qq);
+ LOG.debug("got batch of {} devices", results.size());
+ return results;
+ }
+ public void setBatchSize(int batchSize){
+ this.batchSize = batchSize;
+ }
+ public int getBatchSize(){return batchSize;}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
index 3abe6b2..f92d463 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
@@ -30,4 +30,6 @@ public interface QueueManager {
public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
+ public String getQueuePath();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
index f6cd997..5af0493 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -48,7 +48,7 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification) {
this.em = em;
this.qm = qm;
- this.path = ApplicationQueueManager.QUEUE_NAME;
+ this.path = proxy.getQueuePath();
this.notification = notification;
this.proxy = proxy;
this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index bbd0b52..ec7a14f 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -128,7 +128,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
ns.TEST_PATH_QUERY = pathQuery;
- ApplicationQueueManager.QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
+ ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
listener.run();
}
@@ -753,8 +753,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
final int NUM_DEVICES = 50;
// perform push //
- int oldBatchSize = ApplicationQueueManager.BATCH_SIZE;
- ApplicationQueueManager.BATCH_SIZE = 10;
+ int oldBatchSize = listener.getBatchSize();
+ listener.setBatchSize(10);
app.clear();
app.put("name", UUID.randomUUID().toString());
@@ -793,7 +793,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
try {
scheduleNotificationAndWait(notification);
} finally {
- ApplicationQueueManager.BATCH_SIZE = oldBatchSize;
+ listener.setBatchSize( oldBatchSize);
}
// check receipts //
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index d4db67e..09814f3 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -102,7 +102,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
PathQuery pathQuery = new PathQuery(new SimpleEntityRef( app.getEm().getApplicationRef()), query);
ns.TEST_PATH_QUERY = pathQuery;
- ApplicationQueueManager.QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
+ ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
listener = new QueueListener(ns.getServiceManagerFactory(),
ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
listener.run();