You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/20 20:22:37 UTC

[4/4] git commit: adding properties for queue names and batch size

adding properties for queue names and batch size


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7bcdf10b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7bcdf10b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7bcdf10b

Branch: refs/heads/two-dot-o
Commit: 7bcdf10b6147329d1eabb9d338783351709f7967
Parents: ee33dad
Author: Shawn Feldman <sf...@apache.org>
Authored: Sat Sep 20 12:05:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Sat Sep 20 12:05:02 2014 -0600

----------------------------------------------------------------------
 .../notifications/ApplicationQueueManager.java  | 27 +++++--------
 .../services/notifications/QueueListener.java   | 42 +++++++++++++-------
 .../services/notifications/QueueManager.java    |  2 +
 .../notifications/SingleQueueTaskManager.java   |  2 +-
 .../apns/NotificationsServiceIT.java            |  8 ++--
 .../gcm/NotificationsServiceIT.java             |  2 +-
 6 files changed, 47 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 29e3101..b54c9c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -52,10 +52,9 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Created by ApigeeCorporation on 8/27/14.
  */
 public class ApplicationQueueManager implements QueueManager {
-    public static String QUEUE_NAME = "notifications/queuelistenerv1_11";
-    public static int BATCH_SIZE = 1000;
 
-    public static final long MESSAGE_TRANSACTION_TIMEOUT =  5 * 1000;
+    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_12";
+    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
 
     //this is for tests, will not mark initial post complete, set to false for tests
@@ -67,7 +66,8 @@ public class ApplicationQueueManager implements QueueManager {
     private final org.apache.usergrid.mq.QueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
-    private final Properties properties;
+    private final String queueName;
+
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
     public final Map<String, ProviderAdapter> providerAdapters =   new HashMap<String, ProviderAdapter>(3);
@@ -82,22 +82,15 @@ public class ApplicationQueueManager implements QueueManager {
     public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
 
 
-    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory,Properties properties){
+    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
-        this.properties = properties;
+        this.queueName = properties.getProperty(DEFAULT_QUEUE_PROPERTY, DEFAULT_QUEUE_NAME);
     }
 
-    public static QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
-        QueueQuery qq = new QueueQuery();
-        qq.setLimit(BATCH_SIZE);
-        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(QUEUE_NAME, qq);
-        LOG.debug("got batch of {} devices", results.size());
-        return results;
-    }
+
 
     public boolean scheduleQueueJob(Notification notification) throws Exception{
         return jobScheduler.scheduleQueueJob(notification);
@@ -187,7 +180,7 @@ public class ApplicationQueueManager implements QueueManager {
                                 LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                             }
                             now = System.currentTimeMillis();
-                            qm.postToQueue(QUEUE_NAME, message);
+                            qm.postToQueue(queueName, message);
                             LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms", notification.getUuid(), deviceRef.getUuid());
                             deviceCount.incrementAndGet();
                             queueMeter.mark();
@@ -373,7 +366,7 @@ public class ApplicationQueueManager implements QueueManager {
                         return messageObservable.map(func);
                     }
                 }, Schedulers.io())
-                .buffer(BATCH_SIZE)
+                .buffer(messages.size())
                 .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
                     @Override
                     public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
@@ -579,4 +572,6 @@ public class ApplicationQueueManager implements QueueManager {
         }
     }
 
+    public String getQueuePath(){return queueName;}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 6821681..7a4cd26 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -20,17 +20,13 @@ import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.mq.*;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
+
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.GroupedObservable;
-import rx.schedulers.Schedulers;
-
 import javax.annotation.PostConstruct;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
@@ -41,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class QueueListener  {
     public static int MAX_CONSECUTIVE_FAILS = 10;
 
+    public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
 
@@ -68,6 +65,8 @@ public class QueueListener  {
     List<Future> futures;
 
     public static final String MAX_THREADS = "1";
+    private Integer batchSize = 1000;
+    private String queueName;
 
     public QueueListener() {
         pool = Executors.newFixedThreadPool(1);
@@ -95,6 +94,9 @@ public class QueueListener  {
             try {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", "5000")).longValue();
+                batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
+                queueName = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
+
                 int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
                 futures = new ArrayList<Future>(maxThreads);
                 while (threadCount++ < maxThreads) {
@@ -132,28 +134,28 @@ public class QueueListener  {
         // run until there are no more active jobs
         while ( true ) {
             try {
-                QueueResults results = ApplicationQueueManager.getDeliveryBatch(queueManager);
-                LOG.info("QueueListener: retrieved batch of {} messages",results.size());
+                QueueResults results = getDeliveryBatch(queueManager);
+                LOG.info("QueueListener: retrieved batch of {} messages", results.size());
 
                 List<Message> messages = results.getMessages();
-                if(messages.size()>0) {
-                    HashMap<UUID,List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
+                if (messages.size() > 0) {
+                    HashMap<UUID, List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
-                    for(Message message : messages){
+                    for (Message message : messages) {
                         ApplicationQueueMessage queueMessage = ApplicationQueueMessage.generate(message);
                         UUID applicationId = queueMessage.getApplicationId();
-                        if(!messageMap.containsKey(applicationId)){
+                        if (!messageMap.containsKey(applicationId)) {
                             List<ApplicationQueueMessage> applicationQueueMessages = new ArrayList<ApplicationQueueMessage>();
                             applicationQueueMessages.add(queueMessage);
-                            messageMap.put(applicationId,applicationQueueMessages);
-                        }else{
+                            messageMap.put(applicationId, applicationQueueMessages);
+                        } else {
                             messageMap.get(applicationId).add(queueMessage);
                         }
                     }
                     long now = System.currentTimeMillis();
                     Observable merge = null;
                     //send each set of app ids together
-                    for(Map.Entry<UUID,List<ApplicationQueueMessage>> entry : messageMap.entrySet()){
+                    for (Map.Entry<UUID, List<ApplicationQueueMessage>> entry : messageMap.entrySet()) {
                         UUID applicationId = entry.getKey();
                         EntityManager entityManager = emf.getEntityManager(applicationId);
                         ServiceManager serviceManager = smf.getServiceManager(applicationId);
@@ -206,6 +208,18 @@ public class QueueListener  {
         }
     }
 
+    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
+        QueueQuery qq = new QueueQuery();
+        qq.setLimit(this.getBatchSize());
+        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
+        QueueResults results = queueManager.getFromQueue(queueName, qq);
+        LOG.debug("got batch of {} devices", results.size());
+        return results;
+    }
 
+    public void setBatchSize(int batchSize){
+        this.batchSize = batchSize;
+    }
+    public int getBatchSize(){return batchSize;}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
index 3abe6b2..f92d463 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
@@ -30,4 +30,6 @@ public interface QueueManager {
 
     public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
 
+    public String getQueuePath();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
index f6cd997..5af0493 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -48,7 +48,7 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
     public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification) {
         this.em = em;
         this.qm = qm;
-        this.path = ApplicationQueueManager.QUEUE_NAME;
+        this.path = proxy.getQueuePath();
         this.notification = notification;
         this.proxy = proxy;
         this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index bbd0b52..ec7a14f 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -128,7 +128,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         ns.TEST_PATH_QUERY = pathQuery;
 
-        ApplicationQueueManager.QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
         listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
         listener.run();
     }
@@ -753,8 +753,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         final int NUM_DEVICES = 50;
         // perform push //
-        int oldBatchSize = ApplicationQueueManager.BATCH_SIZE;
-        ApplicationQueueManager.BATCH_SIZE = 10;
+        int oldBatchSize = listener.getBatchSize();
+        listener.setBatchSize(10);
 
         app.clear();
         app.put("name", UUID.randomUUID().toString());
@@ -793,7 +793,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         try {
             scheduleNotificationAndWait(notification);
         } finally {
-            ApplicationQueueManager.BATCH_SIZE = oldBatchSize;
+            listener.setBatchSize( oldBatchSize);
         }
 
         // check receipts //

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7bcdf10b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index d4db67e..09814f3 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -102,7 +102,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         PathQuery pathQuery =  new PathQuery(new SimpleEntityRef(  app.getEm().getApplicationRef()), query);
 
         ns.TEST_PATH_QUERY = pathQuery;
-        ApplicationQueueManager.QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
         listener = new QueueListener(ns.getServiceManagerFactory(),
                 ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
         listener.run();