You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/25 22:01:58 UTC

git commit: moving responsibilities for queue management

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-notifications-queue 6cbd52353 -> b712e39a9


moving responsibilities for queue management


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b712e39a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b712e39a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b712e39a

Branch: refs/heads/two-dot-o-notifications-queue
Commit: b712e39a92019b6c1a95115cbb4adc5525c7efba
Parents: 6cbd523
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Aug 25 14:01:40 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Aug 25 14:01:40 2014 -0600

----------------------------------------------------------------------
 .../notifications/NotificationsQueueManager.java      | 13 ++++++++++++-
 .../services/notifications/QueueListener.java         | 14 +++-----------
 .../notifications/apns/NotificationsServiceIT.java    |  6 +++---
 3 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b712e39a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 70a7770..0674d43 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -51,7 +51,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class NotificationsQueueManager implements NotificationServiceProxy {
     private static final String NOTIFICATION_CONCURRENT_BATCHES = "notification.concurrent.batches";
     public static final String QUEUE_NAME = "notifications/queuelistener";
+    public static int BATCH_SIZE = 1000;
 
+    public static final long MESSAGE_TRANSACTION_TIMEOUT =  5 * 60 * 1000;
     private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
 
     //this is for tests, will not mark initial post complete, set to false for tests
@@ -108,6 +110,15 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
         this.queueSize = metricsFactory.getHistogram(NotificationsService.class, "queue_size");
     }
 
+    public static QueueResults getDeliveryBatch(QueueManager queueManager) throws Exception {
+        QueueQuery qq = new QueueQuery();
+        qq.setLimit(BATCH_SIZE);
+        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
+        QueueResults results = queueManager.getFromQueue(NotificationsQueueManager.QUEUE_NAME, qq);
+        LOG.debug("got batch of {} devices", results.size());
+        return results;
+    }
+
     public boolean scheduleQueueJob(Notification notification) throws Exception{
         return jobScheduler.scheduleQueueJob(notification);
     }
@@ -338,7 +349,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                             });
                         }
                     }, Schedulers.io())
-                    .buffer(QueueListener.BATCH_SIZE)
+                    .buffer(BATCH_SIZE)
                     .map(new Func1<List<QueueMessage>, HashMap<UUID, Notification>>() {
                         @Override
                         public HashMap<UUID, Notification> call(List<QueueMessage> queueMessages) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b712e39a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index d24aa29..5c4f5a8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -40,9 +40,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 @Component( "notificationsQueueListener" )
 public class QueueListener  {
-    public static int BATCH_SIZE = 1000;
     public static int MAX_CONSECUTIVE_FAILS = 10;
-    public static final long MESSAGE_TRANSACTION_TIMEOUT =  5 * 60 * 1000;
+
 
     private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
 
@@ -82,7 +81,7 @@ public class QueueListener  {
         // run until there are no more active jobs
         while ( true ) {
             try {
-                QueueResults results = getDeliveryBatch();
+                QueueResults results = NotificationsQueueManager.getDeliveryBatch(queueManager);
                 List<Message> messages = results.getMessages();
                 HashMap<UUID,List<QueueMessage>> queueMap = new HashMap<>();
                 for(Message message : messages){
@@ -136,13 +135,6 @@ public class QueueListener  {
         }
     }
 
-    private QueueResults getDeliveryBatch() throws Exception {
-        QueueQuery qq = new QueueQuery();
-        qq.setLimit(BATCH_SIZE);
-        qq.setTimeout(this.MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(NotificationsQueueManager.QUEUE_NAME, qq);
-        LOG.debug("got batch of {} devices", results.size());
-        return results;
-    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b712e39a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index a7c91e2..49e828e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -723,8 +723,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         }
 
         // perform push //
-        int oldBatchSize = QueueListener.BATCH_SIZE;
-        QueueListener.BATCH_SIZE = 10;
+        int oldBatchSize = NotificationsQueueManager.BATCH_SIZE;
+        NotificationsQueueManager.BATCH_SIZE = 10;
         try {
             ExecutorService pool = Executors
                     .newFixedThreadPool(APNsAdapter.MAX_CONNECTION_POOL_SIZE);
@@ -736,7 +736,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
                         }catch (Exception e){}
                 }});
         } finally {
-            QueueListener.BATCH_SIZE = oldBatchSize;
+            NotificationsQueueManager.BATCH_SIZE = oldBatchSize;
         }
 
         // check receipts //