You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/26 19:26:55 UTC

[1/2] git commit: pick random queue between 0 and queue size

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o 4cba3b830 -> 05caf4932


pick random queue between 0 and queue size


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f2a93d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f2a93d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f2a93d7f

Branch: refs/heads/two-dot-o
Commit: f2a93d7f3283a148b2091821f2644506313def8f
Parents: 4cba3b8
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 26 11:02:25 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 26 11:02:25 2014 -0600

----------------------------------------------------------------------
 .../notifications/ApplicationQueueManager.java  | 27 +++++++++++---------
 .../services/notifications/QueueListener.java   | 15 +++++------
 .../services/notifications/QueueManager.java    |  4 ---
 .../notifications/SingleQueueTaskManager.java   |  8 +++---
 4 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index d110b95..198d41a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -18,15 +18,9 @@ package org.apache.usergrid.services.notifications;
 
 import com.clearspring.analytics.hash.MurmurHash;
 import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.QueueQuery;
-import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
@@ -66,7 +60,7 @@ public class ApplicationQueueManager implements QueueManager {
     private final org.apache.usergrid.mq.QueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
-    private final String queueName;
+    private final String[] queueNames;
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
@@ -87,7 +81,7 @@ public class ApplicationQueueManager implements QueueManager {
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
-        this.queueName = properties.getProperty(DEFAULT_QUEUE_PROPERTY, DEFAULT_QUEUE_NAME);
+        this.queueNames = getQueueNames(properties);
     }
 
 
@@ -115,6 +109,7 @@ public class ApplicationQueueManager implements QueueManager {
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
 
         final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
+        final String queueName = getRandomQueue(queueNames);
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
@@ -235,7 +230,7 @@ public class ApplicationQueueManager implements QueueManager {
 
         //do i have devices, and have i already started batching.
         if (deviceCount.get() <= 0) {
-            SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this, notification);
+            SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this, notification,queueName);
             //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
             taskManager.finishedBatch();
         }
@@ -288,7 +283,7 @@ public class ApplicationQueueManager implements QueueManager {
      * @param messages
      * @throws Exception
      */
-    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages) {
+    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
         LOG.info("sending batch of {} notifications.", messages.size());
         final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
 
@@ -313,7 +308,7 @@ public class ApplicationQueueManager implements QueueManager {
                     SingleQueueTaskManager taskManager;
                     taskManager = taskMap.get(message.getNotificationId());
                     if (taskManager == null) {
-                        taskManager = new SingleQueueTaskManager(em, qm, proxy, notification);
+                        taskManager = new SingleQueueTaskManager(em, qm, proxy, notification,queuePath);
                         taskMap.putIfAbsent(message.getNotificationId(), taskManager);
                         taskManager = taskMap.get(message.getNotificationId());
                     }
@@ -427,6 +422,15 @@ public class ApplicationQueueManager implements QueueManager {
         return translatedPayloads;
     }
 
+    public static String[] getQueueNames(Properties properties) {
+        String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
+        return names;
+    }
+    public static String getRandomQueue(String[] queueNames) {
+        int size = queueNames.length;
+        Random random = new Random();
+        return queueNames[random.nextInt(size)];
+    }
 
     private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
         private final Iterator<T> input;
@@ -571,6 +575,5 @@ public class ApplicationQueueManager implements QueueManager {
         }
     }
 
-    public String getQueuePath(){return queueName;}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 6e9a7ef..1d4c5cb 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,8 +35,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
-    public static int MAX_CONSECUTIVE_FAILS = 10000;
-
     public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@@ -66,7 +64,7 @@ public class QueueListener  {
 
     public static final String MAX_THREADS = "1";
     private Integer batchSize = 1000;
-    private String queueName;
+    private String[] queueNames;
 
     public QueueListener() {
         pool = Executors.newFixedThreadPool(1);
@@ -95,7 +93,7 @@ public class QueueListener  {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", "5000")).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
-                queueName = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
+                queueNames = ApplicationQueueManager.getQueueNames(properties);
 
                 int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
                 futures = new ArrayList<Future>(maxThreads);
@@ -133,7 +131,8 @@ public class QueueListener  {
         // run until there are no more active jobs
         while ( true ) {
             try {
-                QueueResults results = getDeliveryBatch(queueManager);
+                String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
+                QueueResults results = getDeliveryBatch(queueManager,queueName);
                 LOG.info("QueueListener: retrieved batch of {} messages", results.size());
 
                 List<Message> messages = results.getMessages();
@@ -167,7 +166,7 @@ public class QueueListener  {
                         );
 
                         LOG.info("QueueListener: send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
-                        Observable current = manager.sendBatchToProviders(entry.getValue());
+                        Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
                         if(merge == null)
                             merge = current;
                         else {
@@ -211,11 +210,11 @@ public class QueueListener  {
         }
     }
 
-    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
+    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String queuePath) throws Exception {
         QueueQuery qq = new QueueQuery();
         qq.setLimit(this.getBatchSize());
         qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(queueName, qq);
+        QueueResults results = queueManager.getFromQueue(queuePath, qq);
         LOG.debug("got batch of {} devices", results.size());
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
index f92d463..0024417 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
@@ -26,10 +26,6 @@ import java.util.Set;
  */
 public interface QueueManager {
 
-    public HashMap<Object,Notifier> getNotifierMap();
-
     public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
 
-    public String getQueuePath();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
index 8b4866f..f87f497 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -34,8 +34,8 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
 
     private static final Logger LOG = LoggerFactory
             .getLogger(SingleQueueTaskManager.class);
-    private final String path;
     private final QueueManager proxy;
+    private final String queuePath;
 
     private Notification notification;
     private AtomicLong successes = new AtomicLong();
@@ -45,14 +45,14 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
     private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
     private boolean hasFinished;
 
-    public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification) {
+    public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification,String queuePath) {
         this.em = em;
         this.qm = qm;
-        this.path = proxy.getQueuePath();
         this.notification = notification;
         this.proxy = proxy;
         this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
         hasFinished = false;
+        this.queuePath = queuePath;
     }
 
     public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
@@ -72,7 +72,7 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
             }
 
             LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
-            qm.commitTransaction(path, messageMap.get(deviceUUID).getTransaction(), null);
+            qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
             if (newProviderId != null) {
                 LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
                 replaceProviderId(deviceRef, notifier, newProviderId);


[2/2] git commit: setup default sleep window

Posted by sf...@apache.org.
setup default sleep window


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/05caf493
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/05caf493
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/05caf493

Branch: refs/heads/two-dot-o
Commit: 05caf49325e52f806b3fdc22f18b70512f722d8c
Parents: f2a93d7
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Sep 26 11:23:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Sep 26 11:23:19 2014 -0600

----------------------------------------------------------------------
 .../services/notifications/ApplicationQueueManager.java         | 5 +++--
 .../apache/usergrid/services/notifications/QueueListener.java   | 3 ++-
 .../services/notifications/apns/NotificationsServiceIT.java     | 4 +++-
 .../services/notifications/gcm/NotificationsServiceIT.java      | 4 +++-
 4 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 198d41a..766cb75 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class ApplicationQueueManager implements QueueManager {
 
-    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_12";
+    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_20;notifications/queuelistenerv1_21";
     public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
 
@@ -429,7 +429,8 @@ public class ApplicationQueueManager implements QueueManager {
     public static String getRandomQueue(String[] queueNames) {
         int size = queueNames.length;
         Random random = new Random();
-        return queueNames[random.nextInt(size)];
+        String name = queueNames[random.nextInt(size)];
+        return name;
     }
 
     private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 1d4c5cb..234821b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
     public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
+    public static  long DEFAULT_SLEEP = 5000;
 
     private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
 
@@ -91,7 +92,7 @@ public class QueueListener  {
 
             try {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
-                sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", "5000")).longValue();
+                sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
                 queueNames = ApplicationQueueManager.getQueueNames(properties);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index ec7a14f..a47dc99 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -69,6 +69,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     @Override
     @Before
     public void before() throws Exception {
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+
+        QueueListener.DEFAULT_SLEEP = 200;
         super.before();
         // create apns notifier //
         app.clear();
@@ -128,7 +131,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         ns.TEST_PATH_QUERY = pathQuery;
 
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
         listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
         listener.run();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05caf493/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 09814f3..eb3c8b8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -63,6 +63,9 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     @Override
     @Before
     public void before() throws Exception {
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+        QueueListener.DEFAULT_SLEEP = 200;
+
         super.before();
 
         // create gcm notifier //
@@ -102,7 +105,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         PathQuery pathQuery =  new PathQuery(new SimpleEntityRef(  app.getEm().getApplicationRef()), query);
 
         ns.TEST_PATH_QUERY = pathQuery;
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString();
         listener = new QueueListener(ns.getServiceManagerFactory(),
                 ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
         listener.run();