You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/25 22:01:58 UTC
git commit: moving responsibilities for queue management
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-notifications-queue 6cbd52353 -> b712e39a9
moving responsibilities for queue management
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b712e39a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b712e39a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b712e39a
Branch: refs/heads/two-dot-o-notifications-queue
Commit: b712e39a92019b6c1a95115cbb4adc5525c7efba
Parents: 6cbd523
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Aug 25 14:01:40 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Aug 25 14:01:40 2014 -0600
----------------------------------------------------------------------
.../notifications/NotificationsQueueManager.java | 13 ++++++++++++-
.../services/notifications/QueueListener.java | 14 +++-----------
.../notifications/apns/NotificationsServiceIT.java | 6 +++---
3 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b712e39a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 70a7770..0674d43 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -51,7 +51,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class NotificationsQueueManager implements NotificationServiceProxy {
private static final String NOTIFICATION_CONCURRENT_BATCHES = "notification.concurrent.batches";
public static final String QUEUE_NAME = "notifications/queuelistener";
+ public static int BATCH_SIZE = 1000;
+ public static final long MESSAGE_TRANSACTION_TIMEOUT = 5 * 60 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
//this is for tests, will not mark initial post complete, set to false for tests
@@ -108,6 +110,15 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
this.queueSize = metricsFactory.getHistogram(NotificationsService.class, "queue_size");
}
+ public static QueueResults getDeliveryBatch(QueueManager queueManager) throws Exception {
+ QueueQuery qq = new QueueQuery();
+ qq.setLimit(BATCH_SIZE);
+ qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
+ QueueResults results = queueManager.getFromQueue(NotificationsQueueManager.QUEUE_NAME, qq);
+ LOG.debug("got batch of {} devices", results.size());
+ return results;
+ }
+
public boolean scheduleQueueJob(Notification notification) throws Exception{
return jobScheduler.scheduleQueueJob(notification);
}
@@ -338,7 +349,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
});
}
}, Schedulers.io())
- .buffer(QueueListener.BATCH_SIZE)
+ .buffer(BATCH_SIZE)
.map(new Func1<List<QueueMessage>, HashMap<UUID, Notification>>() {
@Override
public HashMap<UUID, Notification> call(List<QueueMessage> queueMessages) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b712e39a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index d24aa29..5c4f5a8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -40,9 +40,8 @@ import java.util.concurrent.atomic.AtomicInteger;
@Component( "notificationsQueueListener" )
public class QueueListener {
- public static int BATCH_SIZE = 1000;
public static int MAX_CONSECUTIVE_FAILS = 10;
- public static final long MESSAGE_TRANSACTION_TIMEOUT = 5 * 60 * 1000;
+
private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@@ -82,7 +81,7 @@ public class QueueListener {
// run until there are no more active jobs
while ( true ) {
try {
- QueueResults results = getDeliveryBatch();
+ QueueResults results = NotificationsQueueManager.getDeliveryBatch(queueManager);
List<Message> messages = results.getMessages();
HashMap<UUID,List<QueueMessage>> queueMap = new HashMap<>();
for(Message message : messages){
@@ -136,13 +135,6 @@ public class QueueListener {
}
}
- private QueueResults getDeliveryBatch() throws Exception {
- QueueQuery qq = new QueueQuery();
- qq.setLimit(BATCH_SIZE);
- qq.setTimeout(this.MESSAGE_TRANSACTION_TIMEOUT);
- QueueResults results = queueManager.getFromQueue(NotificationsQueueManager.QUEUE_NAME, qq);
- LOG.debug("got batch of {} devices", results.size());
- return results;
- }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b712e39a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index a7c91e2..49e828e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -723,8 +723,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
}
// perform push //
- int oldBatchSize = QueueListener.BATCH_SIZE;
- QueueListener.BATCH_SIZE = 10;
+ int oldBatchSize = NotificationsQueueManager.BATCH_SIZE;
+ NotificationsQueueManager.BATCH_SIZE = 10;
try {
ExecutorService pool = Executors
.newFixedThreadPool(APNsAdapter.MAX_CONNECTION_POOL_SIZE);
@@ -736,7 +736,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
}catch (Exception e){}
}});
} finally {
- QueueListener.BATCH_SIZE = oldBatchSize;
+ NotificationsQueueManager.BATCH_SIZE = oldBatchSize;
}
// check receipts //