You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:03:05 UTC
[41/50] [abbrv] usergrid git commit: Update notification processing
to allow more parallel work.
Update notification processing to allow more parallel work.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8cf78252
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8cf78252
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8cf78252
Branch: refs/heads/asf-site
Commit: 8cf782527f705109fdbedd8d8767e6074e42796a
Parents: 32ab5da
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Apr 16 14:40:20 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Apr 16 14:40:20 2016 +0100
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 47 ++++--
.../IllegalArgumentExceptionMapper.java | 4 +-
.../notifications/NotificationsService.java | 27 ++--
.../impl/ApplicationQueueManagerImpl.java | 145 ++++++++++---------
4 files changed, 128 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index b2330f3..750cf7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -34,7 +34,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.usergrid.persistence.collection.EntitySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -2511,23 +2513,42 @@ public class CpEntityManager implements EntityManager {
@Override
public Results getEntities( List<UUID> ids, String type ) {
- ArrayList<Entity> entities = new ArrayList<Entity>();
- for ( UUID uuid : ids ) {
- EntityRef ref = new SimpleEntityRef( type, uuid );
- Entity entity = null;
- try {
- entity = get( ref );
- }
- catch ( Exception ex ) {
- logger.warn( "Entity {}/{} not found", uuid, type );
- }
- if ( entity != null ) {
- entities.add( entity );
- }
+ List<Id> entityIds = new ArrayList<>();
+
+ for( UUID uuid : ids){
+
+ entityIds.add(new SimpleId( uuid, type ));
+
}
+ // leverage ecm.load so it's a batch fetch of all entities from Cassandra
+ EntitySet entitySet = ecm.load( entityIds ).toBlocking().last();
+
+ List<Entity> entities = entitySet.getEntities().stream().map( mvccEntity -> {
+
+ if( mvccEntity.getEntity().isPresent() ){
+
+ org.apache.usergrid.persistence.model.entity.Entity cpEntity = mvccEntity.getEntity().get();
+
+ Class clazz = Schema.getDefaultSchema().getEntityClass( mvccEntity.getId().getType() );
+
+ Entity entity = EntityFactory.newEntity( mvccEntity.getId().getUuid(), mvccEntity.getId().getType(), clazz );
+ entity.setProperties( cpEntity );
+
+ return entity;
+
+ }else{
+
+ logger.warn("Tried fetching entity with id: {} and type: but was not found",
+ mvccEntity.getId().getUuid(), mvccEntity.getId().getType() );
+
+ return null;
+ }
+ }).collect(Collectors.toList());
+
+
return Results.fromEntities( entities );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
index ff7b656..e6243e9 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/IllegalArgumentExceptionMapper.java
@@ -34,7 +34,9 @@ public class IllegalArgumentExceptionMapper extends AbstractExceptionMapper<Ille
@Override
public Response toResponse( IllegalArgumentException e ) {
- logger.error( "Illegal argument was passed, returning bad request to user", e );
+ if(logger.isTraceEnabled()) {
+ logger.trace("Illegal argument was passed, returning bad request to user", e.getMessage());
+ }
return toResponse( BAD_REQUEST, e );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 50eb883..f4fdb65 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.services.notifications;
import java.util.*;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.services.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,15 +40,6 @@ import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import org.apache.usergrid.services.AbstractCollectionService;
-import org.apache.usergrid.services.ServiceAction;
-import org.apache.usergrid.services.ServiceContext;
-import org.apache.usergrid.services.ServiceInfo;
-import org.apache.usergrid.services.ServiceManagerFactory;
-import org.apache.usergrid.services.ServiceParameter;
-import org.apache.usergrid.services.ServicePayload;
-import org.apache.usergrid.services.ServiceRequest;
-import org.apache.usergrid.services.ServiceResults;
import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
@@ -85,7 +77,6 @@ public class NotificationsService extends AbstractCollectionService {
private ServiceManagerFactory smf;
private EntityManagerFactory emf;
private QueueManagerFactory queueManagerFactory;
- private EntityCollectionManagerFactory ecmf;
public NotificationsService() {
if (logger.isTraceEnabled()) {
@@ -139,10 +130,20 @@ public class NotificationsService extends AbstractCollectionService {
Timer.Context timer = postTimer.time();
postMeter.mark();
try {
+
validate(null, context.getPayload());
- Notification.PathTokens pathTokens = getPathTokens(context.getRequest().getOriginalParameters());
- // default saving of receipts
+ // perform some input validates on useGraph payload property vs. ql= path query
+ final List<ServiceParameter> parameters = context.getRequest().getOriginalParameters();
+ for (ServiceParameter parameter : parameters){
+ if( parameter instanceof ServiceParameter.QueryParameter && context.getProperties().get("useGraph").equals(true)){
+ throw new IllegalArgumentException("Query ql parameter cannot be used with useGraph:true property value");
+ }
+ }
+
+ Notification.PathTokens pathTokens = getPathTokens(parameters);
+
+ // set defaults
context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>()));
context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false));
context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true));
@@ -175,7 +176,7 @@ public class NotificationsService extends AbstractCollectionService {
// future: somehow return 202?
return results;
}catch (Exception e){
- logger.error("serialization failed",e);
+ logger.error(e.getMessage());
throw e;
}finally {
timer.stop();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8cf78252/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 487ea1f..5ce1b93 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -58,7 +58,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
- public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties) {
+ public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
+ QueueManager queueManager, MetricsFactory metricsFactory,
+ Properties properties) {
this.em = entityManager;
this.qm = queueManager;
this.jobScheduler = jobScheduler;
@@ -116,21 +118,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
iterator = pathQuery.iterator(em);
}
- //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
- if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
- if(logger.isTraceEnabled()){
- logger.trace("Scheduling notification job as it has multiple pages of devices.");
- }
- jobScheduler.scheduleQueueJob(notification, true);
- em.update(notification);
- return;
- }
+// //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+// if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+// if(logger.isTraceEnabled()){
+// logger.trace("Scheduling notification job as it has multiple pages of devices.");
+// }
+// jobScheduler.scheduleQueueJob(notification, true);
+// em.update(notification);
+// return;
+// }
final UUID appId = em.getApplication().getUuid();
final Map<String, Object> payloads = notification.getPayloads();
final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> {
+
try {
+ //logger.info("Preparing notification queue message for device: {}", deviceRef.getUuid());
+
long now = System.currentTimeMillis();
String notifierId = null;
@@ -163,6 +168,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
notification.setQueued(System.currentTimeMillis());
}
+
deviceCount.incrementAndGet();
return Optional.of(message);
@@ -190,90 +196,95 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
return Observable.from(getDevices(entity));
})
- .distinct(ref -> ref.getUuid())
- .flatMap( entityRef -> {
+ .distinct(ref -> ref.getUuid() )
+ .map( entityRef -> entityRef.getUuid() )
+ .buffer(10)
+ .flatMap( uuids -> {
- return Observable.just(entityRef).flatMap( ref -> {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Processing batch of {} device(s)", uuids.size());
+ }
- if(logger.isTraceEnabled()){
- logger.trace("Loading device: {}", ref.getUuid());
- }
- try {
- return Observable.just(em.get(ref, Device.class));
- }
- catch (Exception e){
-
- return Observable.empty();
+ return Observable.from(em.getEntities(uuids, "device"))
+ .filter( device -> {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Filtering device: {}", device.getUuid());
}
- }).subscribeOn(Schedulers.io());
+ if(notification.getUseGraph() && filters.size() > 0 ) {
- }, 50)
- .filter( device -> {
+ for (Map.Entry<String, Object> entry : filters.entrySet()) {
- if(logger.isTraceEnabled()) {
- logger.trace("Filtering device: {}", device.getUuid());
- }
+ if ((device.getDynamicProperties().get(entry.getKey()) != null &&
+ device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
+ (device.getProperties().get(entry.getKey()) != null &&
+ device.getProperties().get(entry.getKey()).equals(entry.getValue()))
- if(notification.getUseGraph() && filters.size() > 0 ) {
+ ) {
- for (Map.Entry<String, Object> entry : filters.entrySet()) {
- if ((device.getDynamicProperties().get(entry.getKey()) != null &&
- device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
+ return true;
+ }
- (device.getProperties().get(entry.getKey()) != null &&
- device.getProperties().get(entry.getKey()).equals(entry.getValue()))
+ }
- ) {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Push notification filter matched for notification {}, so removing from notification",
+ device.getUuid(), notification.getUuid());
+ }
+ return false;
- return true;
}
- }
+ return true;
- if(logger.isTraceEnabled()) {
- logger.trace("Push notification filter matched for notification {}, so removing from notification",
- device.getUuid(), notification.getUuid());
- }
- return false;
-
-
- }
+ })
+ .map(sendMessageFunction)
+ .doOnNext( message -> {
+ try {
- return true;
+ if(message.isPresent()){
- })
- .map(sendMessageFunction)
- .doOnNext( message -> {
- try {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
+ }
+ qm.sendMessage( message.get() );
+ queueMeter.mark();
+ }
- if(message.isPresent()){
+ } catch (IOException e) {
- if(logger.isTraceEnabled()) {
- logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
+ if(message.isPresent()){
+ logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+ message.get().getNotificationId(), message.get().getDeviceId());
+ }
+ else{
+ logger.error("Unable to queue notification as it's not present when trying to send to queue");
}
- qm.sendMessage( message.get() );
- queueMeter.mark();
+
}
- } catch (IOException e) {
- if(message.isPresent()){
- logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
- message.get().getNotificationId(), message.get().getDeviceId());
- }
- else{
- logger.error("Unable to queue notification as it's not present when trying to send to queue");
- }
+ }).subscribeOn(Schedulers.io());
+ }, 10)
- }
+ .doOnError(throwable -> {
+ logger.error("Error while processing devices for notification : {}", notification.getUuid());
+ notification.setProcessingFinished(-1L);
+ notification.setDeviceProcessedCount(deviceCount.get());
+ logger.warn("Partial notification. Only {} devices processed for notification {}",
+ deviceCount.get(), notification.getUuid());
+ try {
+ em.update(notification);
+ }catch (Exception e){
+ logger.error("Error updating negative processing status when processing failed.");
+ }
})
.doOnCompleted( () -> {
@@ -282,16 +293,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
notification.setProcessingFinished(System.currentTimeMillis());
notification.setDeviceProcessedCount(deviceCount.get());
em.update(notification);
- logger.info("{} devices processed for notification {}", deviceCount.get(), notification.getUuid());
+ logger.info("{} device(s) processed for notification {}", deviceCount.get(), notification.getUuid());
} catch (Exception e) {
logger.error("Unable to set processing finished timestamp for notification");
}
- })
- .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
+ });
- //TODO verify error handling here
processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
}