You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/19 18:51:17 UTC

[20/50] usergrid git commit: Fix issues with notification segmenting to ensure it's more efficient.

Fix issues with notification segmenting to ensure it's more efficient.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c1375bf6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c1375bf6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c1375bf6

Branch: refs/heads/master
Commit: c1375bf6caa60f36038571c3cd0c2a0b719e36f5
Parents: 8e4d7ee
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Apr 12 00:25:14 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Apr 12 00:25:14 2016 +0200

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  | 14 +++++
 .../org/apache/usergrid/persistence/Query.java  |  2 +
 .../services/notifications/QueueListener.java   | 41 ++++++------
 .../impl/ApplicationQueueManagerImpl.java       | 65 +++++++++++---------
 .../usergrid/services/queues/QueueListener.java |  2 +-
 .../apns/NotificationsServiceIT.java            |  6 +-
 .../gcm/NotificationsServiceIT.java             |  1 -
 7 files changed, 72 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 8b0174c..5cd7c7a 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -447,6 +447,20 @@ usergrid.scheduler.job.workers=4
 usergrid.scheduler.job.queueName=/jobs
 
 
+###############################  Usergrid Push Notifications  #############################
+#
+# Usergrid processes individual push notifications asynchronously using a queue.  Below are
+# settings that can be used to tune this processing.
+
+
+# Set the number of queue consumers to read from the in-region push notification queue.
+#
+usergrid.push.worker_count=8
+
+# Set the sleep time between queue polling ( in milliseconds)
+#
+usergrid.push.sleep=100
+
 
 
 ###############################  Usergrid Central SSO  #############################

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 52e3b4e..150a1b0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -61,6 +61,8 @@ public class Query {
 
     public static final int DEFAULT_LIMIT = 10;
 
+    public static final int MID_LIMIT = 500;
+
     public static final int MAX_LIMIT = 1000;
 
     public static final String PROPERTY_UUID = "uuid";

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index de9cf06..55d1491 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -45,10 +45,10 @@ import java.util.concurrent.atomic.AtomicLong;
  * Singleton listens for notifications queue messages
  */
 public class QueueListener  {
-    public  final int MESSAGE_TRANSACTION_TIMEOUT =  25 * 1000;
+
     private final QueueManagerFactory queueManagerFactory;
 
-    public   long DEFAULT_SLEEP = 5000;
+    public static long DEFAULT_SLEEP = 100;
 
     private static final Logger logger = LoggerFactory.getLogger(QueueListener.class);
 
@@ -61,9 +61,6 @@ public class QueueListener  {
 
     private Properties properties;
 
-
-    private ServiceManager svcMgr;
-
     private long sleepWhenNoneFound = 0;
 
     private long sleepBetweenRuns = 0;
@@ -71,8 +68,8 @@ public class QueueListener  {
     private ExecutorService pool;
     private List<Future> futures;
 
-    public  final int MAX_THREADS = 2;
-    private Integer batchSize = 10;
+    private static final int PUSH_CONSUMER_MAX_THREADS = 8;
+    public static final int MAX_TAKE = 10;
     private String queueName;
     private int consecutiveCallsToRemoveDevices;
 
@@ -99,15 +96,14 @@ public class QueueListener  {
 
             try {
 
-                sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
-                sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
-                batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
+                sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
+                sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
                 consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
                 queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
 
-                int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
+                int maxThreads = new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS));
 
-                futures = new ArrayList<Future>(maxThreads);
+                futures = new ArrayList<>(maxThreads);
 
                 //create our thread pool based on our threadcount.
 
@@ -144,33 +140,40 @@ public class QueueListener  {
     }
 
     private void execute(int threadNumber){
+
         if(Thread.currentThread().isDaemon()) {
             Thread.currentThread().setDaemon(true);
         }
+
         Thread.currentThread().setName(getClass().getSimpleName()+"_PushNotifications-"+threadNumber);
 
         final AtomicInteger consecutiveExceptions = new AtomicInteger();
+
         if (logger.isTraceEnabled()) {
             logger.trace("QueueListener: Starting execute process.");
         }
+
         Meter meter = metricsService.getMeter(QueueListener.class, "execute.commit");
         com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
-        svcMgr = smf.getServiceManager(smf.getManagementAppId());
+
         if (logger.isTraceEnabled()) {
             logger.trace("getting from queue {} ", queueName);
         }
+
         QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL);
         QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+
         // run until there are no more active jobs
         final AtomicLong runCount = new AtomicLong(0);
+
         //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
         LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager);
 
         while ( true ) {
 
                 Timer.Context timerContext = timer.time();
-                rx.Observable.from(queueManager.getMessages(getBatchSize(), ApplicationQueueMessage.class))
-                    .buffer(getBatchSize())
+                rx.Observable.from(queueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
+                    .buffer(MAX_TAKE)
                     .doOnNext(messages -> {
 
                         try {
@@ -329,12 +332,4 @@ public class QueueListener  {
         pool.shutdownNow();
     }
 
-
-    public void setBatchSize(int batchSize){
-        this.batchSize = batchSize;
-    }
-    public int getBatchSize(){return batchSize;}
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 04e60b7..6c28d2f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -120,7 +120,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String, Object> payloads = notification.getPayloads();
 
-            final Func1<EntityRef, ApplicationQueueMessage> sendMessageFunction = deviceRef -> {
+            final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> {
                 try {
 
                     long now = System.currentTimeMillis();
@@ -145,7 +145,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                     if (notifierId == null) {
                         //TODO need to leverage optional here
-                        //return deviceRef;
+                        return Optional.empty();
                     }
 
                     ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
@@ -157,15 +157,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                     }
                     deviceCount.incrementAndGet();
 
-                    return message;
+                    return Optional.of(message);
 
 
                 } catch (Exception deviceLoopException) {
                     logger.error("Failed to add device", deviceLoopException);
                     errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);
 
-                    //TODO need an optional here
-                    return new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), "test", "test");
+                    return Optional.empty();
                 }
 
             };
@@ -185,13 +184,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                     applicationQueueMessages.forEach( message -> {
 
                         try {
-
-                            qm.sendMessage( message );
-                            queueMeter.mark();
+                            if(message.isPresent()){
+                                qm.sendMessage( message.get() );
+                                queueMeter.mark();
+                            }
 
                         } catch (IOException e) {
-                           logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
-                               message.getNotificationId(), message.getDeviceId());
+
+                            if(message.isPresent()){
+                                logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+                                    message.get().getNotificationId(), message.get().getDeviceId());
+                            }
+                            else{
+                                logger.error("Unable to queue notification as it's not present when trying to send to queue");
+                            }
+
                         }
 
                     });
@@ -200,7 +207,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 })
                 .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
 
-            processMessagesObservable.toBlocking(); // let this run and block the async thread, messages are queued
+            processMessagesObservable.toBlocking().last(); // let this run and block the async thread, messages are queued
 
         }
 
@@ -513,6 +520,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
         List<EntityRef> devices = new ArrayList<>();
 
+        final int LIMIT = Query.MID_LIMIT;
+
 
         try {
 
@@ -529,52 +538,48 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                     initial = false;
 
-                    final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, Query.DEFAULT_LIMIT,
+                    final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, LIMIT,
                         Query.Level.REFS, true).getRefs();
 
                     resultSize = mydevices.size();
+
                     if(mydevices.size() > 0){
                         start = mydevices.get(mydevices.size() - 1 ).getUuid();
                     }
 
-
                     devices.addAll( mydevices  );
 
-
                 }
 
             } else if ("group".equals(ref.getType())) {
 
-                //devices = new ArrayList<>();
                 UUID start = null;
                 boolean initial = true;
                 int resultSize = 0;
 
-                while( initial || resultSize >= Query.DEFAULT_LIMIT){
+                while( initial || resultSize >= LIMIT){
 
-                        initial = false;
-                        final List<EntityRef> myusers =  em.getCollection(ref, "users", start,
-                            Query.DEFAULT_LIMIT, Query.Level.REFS, true).getRefs();
+                    initial = false;
+                    final List<EntityRef> myusers =  em.getCollection(ref, "users", start,
+                        LIMIT, Query.Level.REFS, true).getRefs();
 
-                        resultSize = myusers.size();
-                        if(myusers.size() > 0){
-                            start = myusers.get(myusers.size() - 1 ).getUuid();
-                        }
+                    resultSize = myusers.size();
 
+                    if(myusers.size() > 0){
+                        start = myusers.get(myusers.size() - 1 ).getUuid();
+                    }
 
-                        // don't allow a single user to have more than 100 devices?
-                        for (EntityRef user : myusers) {
 
-                            devices.addAll( em.getCollection(user, "devices", null, 100,
-                                Query.Level.REFS, true).getRefs() );
+                    // don't allow a single user to have more than 100 devices?
+                    for (EntityRef user : myusers) {
 
+                        devices.addAll( em.getCollection(user, "devices", null, 100,
+                            Query.Level.REFS, true).getRefs() );
 
-                        }
+                    }
 
                 }
 
-
-
             }
         } catch (Exception e) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 5895d38..9d95d87 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -110,7 +110,7 @@ public abstract class QueueListener  {
             try {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.queues.listener.sleep.between", ""+sleepBetweenRuns)).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.queues.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
-                batchSize = new Integer(properties.getProperty("usergrid.queues.listener.batchSize", (""+batchSize)));
+                batchSize = new Integer(properties.getProperty("usergrid.queues.listener.MAX_TAKE", (""+batchSize)));
                 consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.queues.inactive.interval", ""+200));
                 queueName = getQueueName();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index dea4e49..3923827 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -129,7 +129,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     public void after() throws Exception {
         if(listener != null) {
             listener.stop();
-            listener = null;
         }
     }
 
@@ -683,8 +682,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         final int NUM_DEVICES = 50;
         // perform push //
-        int oldBatchSize = listener.getBatchSize();
-        listener.setBatchSize(10);
+        int oldBatchSize = QueueListener.MAX_TAKE;
 
         app.clear();
         app.put("name", UUID.randomUUID().toString());
@@ -724,7 +722,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         try {
             notificationWaitForComplete(notification);
         } finally {
-            listener.setBatchSize( oldBatchSize);
+            //noop
         }
 
         // check receipts //

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c1375bf6/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 65cc54a..1c7915a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -106,7 +106,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     public void after() {
         if (listener != null) {
             listener.stop();
-            listener = null;
         }
     }