You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:03:05 UTC

[41/50] [abbrv] usergrid git commit: Update notification processing to allow more parallel work.

Update notification processing to allow more parallel work.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8cf78252
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8cf78252
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8cf78252

Branch: refs/heads/asf-site
Commit: 8cf782527f705109fdbedd8d8767e6074e42796a
Parents: 32ab5da
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Apr 16 14:40:20 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Apr 16 14:40:20 2016 +0100

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  47 ++++--
 .../IllegalArgumentExceptionMapper.java         |   4 +-
 .../notifications/NotificationsService.java     |  27 ++--
 .../impl/ApplicationQueueManagerImpl.java       | 145 ++++++++++---------
 4 files changed, 128 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index b2330f3..750cf7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -34,7 +34,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
+import org.apache.usergrid.persistence.collection.EntitySet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -2511,23 +2513,42 @@ public class CpEntityManager implements EntityManager {
     @Override
     public Results getEntities( List<UUID> ids, String type ) {
 
-        ArrayList<Entity> entities = new ArrayList<Entity>();
 
-        for ( UUID uuid : ids ) {
-            EntityRef ref = new SimpleEntityRef( type, uuid );
-            Entity entity = null;
-            try {
-                entity = get( ref );
-            }
-            catch ( Exception ex ) {
-                logger.warn( "Entity {}/{} not found", uuid, type );
-            }
 
-            if ( entity != null ) {
-                entities.add( entity );
-            }
+        List<Id> entityIds = new ArrayList<>();
+
+        for( UUID uuid : ids){
+
+            entityIds.add(new SimpleId( uuid, type ));
+
         }
 
+        // leverage ecm.load so it's a batch fetch of all entities from Cassandra
+        EntitySet entitySet = ecm.load( entityIds ).toBlocking().last();
+
+        List<Entity> entities = entitySet.getEntities().stream().map( mvccEntity -> {
+
+            if( mvccEntity.getEntity().isPresent() ){
+
+                org.apache.usergrid.persistence.model.entity.Entity cpEntity = mvccEntity.getEntity().get();
+
+                Class clazz = Schema.getDefaultSchema().getEntityClass( mvccEntity.getId().getType() );
+
+                Entity entity = EntityFactory.newEntity( mvccEntity.getId().getUuid(), mvccEntity.getId().getType(), clazz );
+                entity.setProperties(  cpEntity  );
+
+                return entity;
+
+            }else{
+
+                logger.warn("Tried fetching entity with id: {} and type: but was not found",
+                    mvccEntity.getId().getUuid(), mvccEntity.getId().getType() );
+
+                return null;
+            }
+        }).collect(Collectors.toList());
+
+
         return Results.fromEntities( entities );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
index ff7b656..e6243e9 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
@@ -34,7 +34,9 @@ public class IllegalArgumentExceptionMapper extends AbstractExceptionMapper<Ille
     @Override
     public Response toResponse( IllegalArgumentException e ) {
 
-        logger.error( "Illegal argument was passed, returning bad request to user", e );
+        if(logger.isTraceEnabled()) {
+            logger.trace("Illegal argument was passed, returning bad request to user", e.getMessage());
+        }
 
         return toResponse( BAD_REQUEST, e );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 50eb883..f4fdb65 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.services.notifications;
 import java.util.*;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.services.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,15 +40,6 @@ import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import org.apache.usergrid.services.AbstractCollectionService;
-import org.apache.usergrid.services.ServiceAction;
-import org.apache.usergrid.services.ServiceContext;
-import org.apache.usergrid.services.ServiceInfo;
-import org.apache.usergrid.services.ServiceManagerFactory;
-import org.apache.usergrid.services.ServiceParameter;
-import org.apache.usergrid.services.ServicePayload;
-import org.apache.usergrid.services.ServiceRequest;
-import org.apache.usergrid.services.ServiceResults;
 import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
 import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 
@@ -85,7 +77,6 @@ public class NotificationsService extends AbstractCollectionService {
     private ServiceManagerFactory smf;
     private EntityManagerFactory emf;
     private QueueManagerFactory queueManagerFactory;
-    private EntityCollectionManagerFactory ecmf;
 
     public NotificationsService() {
         if (logger.isTraceEnabled()) {
@@ -139,10 +130,20 @@ public class NotificationsService extends AbstractCollectionService {
         Timer.Context timer = postTimer.time();
         postMeter.mark();
         try {
+
             validate(null, context.getPayload());
-            Notification.PathTokens pathTokens = getPathTokens(context.getRequest().getOriginalParameters());
 
-            // default saving of receipts
+            // perform some input validates on useGraph payload property vs. ql= path query
+            final List<ServiceParameter> parameters = context.getRequest().getOriginalParameters();
+            for (ServiceParameter parameter : parameters){
+                if( parameter instanceof ServiceParameter.QueryParameter && context.getProperties().get("useGraph").equals(true)){
+                    throw new IllegalArgumentException("Query ql parameter cannot be used with useGraph:true property value");
+                }
+            }
+
+            Notification.PathTokens pathTokens = getPathTokens(parameters);
+
+            // set defaults
             context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>()));
             context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false));
             context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true));
@@ -175,7 +176,7 @@ public class NotificationsService extends AbstractCollectionService {
             // future: somehow return 202?
             return results;
         }catch (Exception e){
-            logger.error("serialization failed",e);
+            logger.error(e.getMessage());
             throw e;
         }finally {
             timer.stop();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 487ea1f..5ce1b93 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -58,7 +58,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
 
 
-    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties) {
+    public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
+                                        QueueManager queueManager, MetricsFactory metricsFactory,
+                                        Properties properties) {
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
@@ -116,21 +118,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 iterator = pathQuery.iterator(em);
             }
 
-            //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
-            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
-                if(logger.isTraceEnabled()){
-                    logger.trace("Scheduling notification job as it has multiple pages of devices.");
-                }
-                jobScheduler.scheduleQueueJob(notification, true);
-                em.update(notification);
-                return;
-            }
+//            //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+//            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+//                if(logger.isTraceEnabled()){
+//                    logger.trace("Scheduling notification job as it has multiple pages of devices.");
+//                }
+//                jobScheduler.scheduleQueueJob(notification, true);
+//                em.update(notification);
+//                return;
+//            }
             final UUID appId = em.getApplication().getUuid();
             final Map<String, Object> payloads = notification.getPayloads();
 
             final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> {
+
                 try {
 
+                    //logger.info("Preparing notification queue message for device: {}", deviceRef.getUuid());
+
                     long now = System.currentTimeMillis();
 
                     String notifierId = null;
@@ -163,6 +168,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                         notification.setQueued(System.currentTimeMillis());
 
                     }
+
                     deviceCount.incrementAndGet();
 
                     return Optional.of(message);
@@ -190,90 +196,95 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                     return Observable.from(getDevices(entity));
 
                 })
-                .distinct(ref -> ref.getUuid())
-                .flatMap( entityRef -> {
+                .distinct(ref -> ref.getUuid() )
+                .map( entityRef -> entityRef.getUuid() )
+                .buffer(10)
+                .flatMap( uuids -> {
 
-                    return Observable.just(entityRef).flatMap( ref -> {
+                    if(logger.isTraceEnabled()) {
+                        logger.trace("Processing batch of {} device(s)", uuids.size());
+                    }
 
-                        if(logger.isTraceEnabled()){
-                            logger.trace("Loading device: {}", ref.getUuid());
 
-                        }
-                            try {
-                                return Observable.just(em.get(ref, Device.class));
-                            }
-                            catch (Exception e){
-
-                                return Observable.empty();
+                    return Observable.from(em.getEntities(uuids, "device"))
+                        .filter( device -> {
 
+                            if(logger.isTraceEnabled()) {
+                                logger.trace("Filtering device: {}", device.getUuid());
                             }
 
-                        }).subscribeOn(Schedulers.io());
 
+                            if(notification.getUseGraph() && filters.size() > 0 ) {
 
-                }, 50)
-                .filter( device -> {
+                                for (Map.Entry<String, Object> entry : filters.entrySet()) {
 
-                    if(logger.isTraceEnabled()) {
-                        logger.trace("Filtering device: {}", device.getUuid());
-                    }
+                                    if ((device.getDynamicProperties().get(entry.getKey()) != null &&
+                                        device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
 
+                                        (device.getProperties().get(entry.getKey()) != null &&
+                                            device.getProperties().get(entry.getKey()).equals(entry.getValue()))
 
-                    if(notification.getUseGraph() && filters.size() > 0 ) {
+                                        ) {
 
-                        for (Map.Entry<String, Object> entry : filters.entrySet()) {
 
-                            if ((device.getDynamicProperties().get(entry.getKey()) != null &&
-                                device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
+                                        return true;
+                                    }
 
-                                (device.getProperties().get(entry.getKey()) != null &&
-                                    device.getProperties().get(entry.getKey()).equals(entry.getValue()))
+                                }
 
-                                ) {
+                                if(logger.isTraceEnabled()) {
+                                    logger.trace("Push notification filter matched for notification {}, so removing from notification",
+                                        device.getUuid(), notification.getUuid());
+                                }
+                                return false;
 
 
-                                return true;
                             }
 
-                        }
+                            return true;
 
-                        if(logger.isTraceEnabled()) {
-                            logger.trace("Push notification filter matched for notification {}, so removing from notification",
-                                device.getUuid(), notification.getUuid());
-                        }
-                        return false;
-
-
-                    }
+                        })
+                        .map(sendMessageFunction)
+                        .doOnNext( message -> {
+                            try {
 
-                    return true;
+                                if(message.isPresent()){
 
-                })
-                .map(sendMessageFunction)
-                .doOnNext( message -> {
-                        try {
+                                    if(logger.isTraceEnabled()) {
+                                        logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
+                                    }
+                                    qm.sendMessage( message.get() );
+                                    queueMeter.mark();
+                                }
 
-                            if(message.isPresent()){
+                            } catch (IOException e) {
 
-                                if(logger.isTraceEnabled()) {
-                                    logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
+                                if(message.isPresent()){
+                                    logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+                                        message.get().getNotificationId(), message.get().getDeviceId());
+                                }
+                                else{
+                                    logger.error("Unable to queue notification as it's not present when trying to send to queue");
                                 }
-                                qm.sendMessage( message.get() );
-                                queueMeter.mark();
+
                             }
 
-                        } catch (IOException e) {
 
-                            if(message.isPresent()){
-                                logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
-                                    message.get().getNotificationId(), message.get().getDeviceId());
-                            }
-                            else{
-                                logger.error("Unable to queue notification as it's not present when trying to send to queue");
-                            }
+                        }).subscribeOn(Schedulers.io());
+                }, 10)
 
-                        }
+                .doOnError(throwable -> {
 
+                    logger.error("Error while processing devices for notification : {}", notification.getUuid());
+                    notification.setProcessingFinished(-1L);
+                    notification.setDeviceProcessedCount(deviceCount.get());
+                    logger.warn("Partial notification. Only {} devices processed for notification {}",
+                        deviceCount.get(), notification.getUuid());
+                    try {
+                        em.update(notification);
+                    }catch (Exception e){
+                        logger.error("Error updating negative processing status when processing failed.");
+                    }
 
                 })
                 .doOnCompleted( () -> {
@@ -282,16 +293,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                         notification.setProcessingFinished(System.currentTimeMillis());
                         notification.setDeviceProcessedCount(deviceCount.get());
                         em.update(notification);
-                        logger.info("{} devices processed for notification {}", deviceCount.get(), notification.getUuid());
+                        logger.info("{} device(s) processed for notification {}", deviceCount.get(), notification.getUuid());
 
                     } catch (Exception e) {
                         logger.error("Unable to set processing finished timestamp for notification");
                     }
 
-                })
-                .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
+                });
 
-            //TODO verify error handling here
             processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
 
         }