You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/19 18:51:17 UTC
[20/50] usergrid git commit: Fix issues with notification segmenting
to ensure it's more efficient.
Fix issues with notification segmenting to ensure it's more efficient.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c1375bf6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c1375bf6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c1375bf6
Branch: refs/heads/master
Commit: c1375bf6caa60f36038571c3cd0c2a0b719e36f5
Parents: 8e4d7ee
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Apr 12 00:25:14 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Apr 12 00:25:14 2016 +0200
----------------------------------------------------------------------
.../main/resources/usergrid-default.properties | 14 +++++
.../org/apache/usergrid/persistence/Query.java | 2 +
.../services/notifications/QueueListener.java | 41 ++++++------
.../impl/ApplicationQueueManagerImpl.java | 65 +++++++++++---------
.../usergrid/services/queues/QueueListener.java | 2 +-
.../apns/NotificationsServiceIT.java | 6 +-
.../gcm/NotificationsServiceIT.java | 1 -
7 files changed, 72 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 8b0174c..5cd7c7a 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -447,6 +447,20 @@ usergrid.scheduler.job.workers=4
usergrid.scheduler.job.queueName=/jobs
+############################### Usergrid Push Notifications #############################
+#
+# Usergrid processes individual push notifications asynchronously using a queue. Below are
+# settings that can be used to tune this processing.
+
+
+# Set the number of queue consumers to read from the in-region push notification queue.
+#
+usergrid.push.worker_count=8
+
+# Set the sleep time between queue polling ( in milliseconds)
+#
+usergrid.push.sleep=100
+
############################### Usergrid Central SSO #############################
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 52e3b4e..150a1b0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -61,6 +61,8 @@ public class Query {
public static final int DEFAULT_LIMIT = 10;
+ public static final int MID_LIMIT = 500;
+
public static final int MAX_LIMIT = 1000;
public static final String PROPERTY_UUID = "uuid";
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index de9cf06..55d1491 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -45,10 +45,10 @@ import java.util.concurrent.atomic.AtomicLong;
* Singleton listens for notifications queue messages
*/
public class QueueListener {
- public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000;
+
private final QueueManagerFactory queueManagerFactory;
- public long DEFAULT_SLEEP = 5000;
+ public static long DEFAULT_SLEEP = 100;
private static final Logger logger = LoggerFactory.getLogger(QueueListener.class);
@@ -61,9 +61,6 @@ public class QueueListener {
private Properties properties;
-
- private ServiceManager svcMgr;
-
private long sleepWhenNoneFound = 0;
private long sleepBetweenRuns = 0;
@@ -71,8 +68,8 @@ public class QueueListener {
private ExecutorService pool;
private List<Future> futures;
- public final int MAX_THREADS = 2;
- private Integer batchSize = 10;
+ private static final int PUSH_CONSUMER_MAX_THREADS = 8;
+ public static final int MAX_TAKE = 10;
private String queueName;
private int consecutiveCallsToRemoveDevices;
@@ -99,15 +96,14 @@ public class QueueListener {
try {
- sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
- sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
- batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
+ sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
+ sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
- int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
+ int maxThreads = new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS));
- futures = new ArrayList<Future>(maxThreads);
+ futures = new ArrayList<>(maxThreads);
//create our thread pool based on our threadcount.
@@ -144,33 +140,40 @@ public class QueueListener {
}
private void execute(int threadNumber){
+
if(Thread.currentThread().isDaemon()) {
Thread.currentThread().setDaemon(true);
}
+
Thread.currentThread().setName(getClass().getSimpleName()+"_PushNotifications-"+threadNumber);
final AtomicInteger consecutiveExceptions = new AtomicInteger();
+
if (logger.isTraceEnabled()) {
logger.trace("QueueListener: Starting execute process.");
}
+
Meter meter = metricsService.getMeter(QueueListener.class, "execute.commit");
com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
- svcMgr = smf.getServiceManager(smf.getManagementAppId());
+
if (logger.isTraceEnabled()) {
logger.trace("getting from queue {} ", queueName);
}
+
QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL);
QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+
// run until there are no more active jobs
final AtomicLong runCount = new AtomicLong(0);
+
//cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager);
while ( true ) {
Timer.Context timerContext = timer.time();
- rx.Observable.from(queueManager.getMessages(getBatchSize(), ApplicationQueueMessage.class))
- .buffer(getBatchSize())
+ rx.Observable.from(queueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
+ .buffer(MAX_TAKE)
.doOnNext(messages -> {
try {
@@ -329,12 +332,4 @@ public class QueueListener {
pool.shutdownNow();
}
-
- public void setBatchSize(int batchSize){
- this.batchSize = batchSize;
- }
- public int getBatchSize(){return batchSize;}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 04e60b7..6c28d2f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -120,7 +120,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
final UUID appId = em.getApplication().getUuid();
final Map<String, Object> payloads = notification.getPayloads();
- final Func1<EntityRef, ApplicationQueueMessage> sendMessageFunction = deviceRef -> {
+ final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> {
try {
long now = System.currentTimeMillis();
@@ -145,7 +145,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
if (notifierId == null) {
//TODO need to leverage optional here
- //return deviceRef;
+ return Optional.empty();
}
ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
@@ -157,15 +157,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
deviceCount.incrementAndGet();
- return message;
+ return Optional.of(message);
} catch (Exception deviceLoopException) {
logger.error("Failed to add device", deviceLoopException);
errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);
- //TODO need an optional here
- return new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), "test", "test");
+ return Optional.empty();
}
};
@@ -185,13 +184,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
applicationQueueMessages.forEach( message -> {
try {
-
- qm.sendMessage( message );
- queueMeter.mark();
+ if(message.isPresent()){
+ qm.sendMessage( message.get() );
+ queueMeter.mark();
+ }
} catch (IOException e) {
- logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
- message.getNotificationId(), message.getDeviceId());
+
+ if(message.isPresent()){
+ logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+ message.get().getNotificationId(), message.get().getDeviceId());
+ }
+ else{
+ logger.error("Unable to queue notification as it's not present when trying to send to queue");
+ }
+
}
});
@@ -200,7 +207,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
})
.doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
- processMessagesObservable.toBlocking(); // let this run and block the async thread, messages are queued
+ processMessagesObservable.toBlocking().last(); // let this run and block the async thread, messages are queued
}
@@ -513,6 +520,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
List<EntityRef> devices = new ArrayList<>();
+ final int LIMIT = Query.MID_LIMIT;
+
try {
@@ -529,52 +538,48 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
initial = false;
- final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, Query.DEFAULT_LIMIT,
+ final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, LIMIT,
Query.Level.REFS, true).getRefs();
resultSize = mydevices.size();
+
if(mydevices.size() > 0){
start = mydevices.get(mydevices.size() - 1 ).getUuid();
}
-
devices.addAll( mydevices );
-
}
} else if ("group".equals(ref.getType())) {
- //devices = new ArrayList<>();
UUID start = null;
boolean initial = true;
int resultSize = 0;
- while( initial || resultSize >= Query.DEFAULT_LIMIT){
+ while( initial || resultSize >= LIMIT){
- initial = false;
- final List<EntityRef> myusers = em.getCollection(ref, "users", start,
- Query.DEFAULT_LIMIT, Query.Level.REFS, true).getRefs();
+ initial = false;
+ final List<EntityRef> myusers = em.getCollection(ref, "users", start,
+ LIMIT, Query.Level.REFS, true).getRefs();
- resultSize = myusers.size();
- if(myusers.size() > 0){
- start = myusers.get(myusers.size() - 1 ).getUuid();
- }
+ resultSize = myusers.size();
+ if(myusers.size() > 0){
+ start = myusers.get(myusers.size() - 1 ).getUuid();
+ }
- // don't allow a single user to have more than 100 devices?
- for (EntityRef user : myusers) {
- devices.addAll( em.getCollection(user, "devices", null, 100,
- Query.Level.REFS, true).getRefs() );
+ // don't allow a single user to have more than 100 devices?
+ for (EntityRef user : myusers) {
+ devices.addAll( em.getCollection(user, "devices", null, 100,
+ Query.Level.REFS, true).getRefs() );
- }
+ }
}
-
-
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 5895d38..9d95d87 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -110,7 +110,7 @@ public abstract class QueueListener {
try {
sleepBetweenRuns = new Long(properties.getProperty("usergrid.queues.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.queues.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
- batchSize = new Integer(properties.getProperty("usergrid.queues.listener.batchSize", (""+batchSize)));
+ batchSize = new Integer(properties.getProperty("usergrid.queues.listener.MAX_TAKE", (""+batchSize)));
consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.queues.inactive.interval", ""+200));
queueName = getQueueName();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index dea4e49..3923827 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -129,7 +129,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
public void after() throws Exception {
if(listener != null) {
listener.stop();
- listener = null;
}
}
@@ -683,8 +682,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
final int NUM_DEVICES = 50;
// perform push //
- int oldBatchSize = listener.getBatchSize();
- listener.setBatchSize(10);
+ int oldBatchSize = QueueListener.MAX_TAKE;
app.clear();
app.put("name", UUID.randomUUID().toString());
@@ -724,7 +722,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
try {
notificationWaitForComplete(notification);
} finally {
- listener.setBatchSize( oldBatchSize);
+ //noop
}
// check receipts //
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 65cc54a..1c7915a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -106,7 +106,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
public void after() {
if (listener != null) {
listener.stop();
- listener = null;
}
}