You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/19 18:51:42 UTC

[45/50] usergrid git commit: Parallel device fetching from users, need to update to support all PN use cases with this parallelism.

Parallel device fetching from users, need to update to support all PN use cases with this parallelism.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cc3cbfee
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cc3cbfee
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cc3cbfee

Branch: refs/heads/master
Commit: cc3cbfee60f83e107269fe4fb558cb094b5c2032
Parents: 8cf7825
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Apr 17 14:34:45 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Apr 17 14:34:45 2016 +0100

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  21 ++++
 .../pipeline/builder/IdBuilder.java             |   6 ++
 .../pipeline/read/FilterFactory.java            |   8 +-
 .../pipeline/read/traverse/IdFilter.java        |  52 ++++++++++
 .../results/IdQueryExecutor.java                |  66 ++++++++++++
 .../service/CollectionSearch.java               |   9 ++
 .../service/CollectionService.java              |   5 +
 .../service/CollectionServiceImpl.java          |  23 ++++
 .../persistence/NotificationGraphIterator.java  |  79 +++++++++++++-
 .../apache/usergrid/persistence/Results.java    |  16 +++
 .../persistence/entities/Notification.java      |  16 ++-
 .../impl/ApplicationQueueManagerImpl.java       | 104 +++++++++++++++----
 12 files changed, 369 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 9ecf466..5596ab4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.corepersistence;
 
 import java.util.*;
 
+import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -614,6 +615,24 @@ public class CpRelationManager implements RelationManager {
         final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
         final Id ownerId = headEntity.asId();
 
+
+        if(query.getLevel() == Level.IDS ){
+
+            return new IdQueryExecutor( toExecute.getCursor() ) {
+                @Override
+                protected Observable<ResultsPage<Id>> buildNewResultsPage(
+                    final Optional<String> cursor ) {
+
+                    final CollectionSearch search =
+                        new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
+                            queryString, cursor );
+
+                    return collectionService.searchCollectionIds( search );
+                }
+            }.next();
+
+        }
+
         //wire the callback so we can get each page
         return new EntityQueryExecutor( toExecute.getCursor() ) {
             @Override
@@ -989,6 +1008,8 @@ public class CpRelationManager implements RelationManager {
         //            }
         //        }
 
+
+
         return query;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 0f784a6..65cf7c1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe
 import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -148,4 +149,9 @@ public class IdBuilder {
         return new ConnectionRefBuilder(connectionRefFilter);
     }
 
+    public Observable<ResultsPage<Id>> build(){
+        //we must add our resume filter so we drop our previous page first element if it's present
+        return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index ca5695c..883fdc8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -27,13 +27,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityF
 import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.*;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
new file mode 100644
index 0000000..9b3a05a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+import java.util.List;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
+ * be removed
+ */
+public class IdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{
+
+    @Inject
+    public IdFilter() {};
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+        return filterValueObservable.map( idFilterResult ->  new FilterResult( idFilterResult.getValue(), idFilterResult.getPath() ));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
new file mode 100644
index 0000000..5a0bb3f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of connection refs
+ */
+@Deprecated//Required for 1.0 compatibility
+public abstract class IdQueryExecutor extends ObservableQueryExecutor<Id> {
+
+
+    protected IdQueryExecutor( final Optional<String> startCursor ) {
+        super( startCursor );
+    }
+
+
+    @Override
+    protected Results createResults( final ResultsPage resultsPage ) {
+        final List<Id> ids = resultsPage.getEntityList();
+
+        List<UUID> uuids = ids.stream().map(id -> id.getUuid()).collect(Collectors.toList());
+
+        final Results results = Results.fromIdList(uuids);
+
+        return results;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
index ab8a8bc..602a5b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -29,6 +29,10 @@ import com.google.common.base.Optional;
  */
 public class CollectionSearch {
 
+    public enum Level {
+        IDS, ALL
+    }
+
     private final ApplicationScope applicationScope;
     private final Id collectionOwnerId;
     private final String collectionName;
@@ -36,6 +40,7 @@ public class CollectionSearch {
     private final int limit;
     private final Optional<String> query;
     private final Optional<String> cursor;
+    private Level level = Level.ALL;
 
 
     public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String
@@ -84,4 +89,8 @@ public class CollectionSearch {
     public Id getCollectionOwnerId() {
         return collectionOwnerId;
     }
+
+    public void setResultsLevel(Level level){ this.level = level; }
+
+    public Level getResultsLevel(){ return level; }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
index eef741a..6a46022 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.service;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
@@ -35,4 +36,8 @@ public interface CollectionService {
      * @return An observable with results page entries for the stream
      */
     Observable<ResultsPage<Entity>> searchCollection(final CollectionSearch search);
+
+
+
+    Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
index fa79d09..9244315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -29,6 +29,7 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
@@ -73,4 +74,26 @@ public class CollectionServiceImpl implements CollectionService {
 
         return results.build();
     }
+
+    @Override
+    public Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search ) {
+
+
+        final ApplicationScope applicationScope = search.getApplicationScope();
+        final String collectionName = search.getCollectionName();
+        final Optional<String> query = search.getQuery();
+
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
+                .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+
+
+        final IdBuilder results;
+
+
+        results = pipelineBuilder.traverseCollection( collectionName );
+
+
+        return results.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
index b83f555..a1f3246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -17,10 +17,16 @@
 package org.apache.usergrid.persistence;
 
 
+import org.apache.usergrid.persistence.entities.Group;
+import org.apache.usergrid.persistence.entities.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
@@ -58,8 +64,19 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
             return true;
         }
         while (source.hasNext()) {
-            EntityRef ref = source.next();
-            Results r = getResultsFor(ref);
+            Object next = source.next();
+            Results r;
+
+//            if(next instanceof UUID){
+//
+//                UUID id = (UUID) next;
+//                r = getResultsForId(id, "user");
+//
+//            }else {
+                EntityRef ref = (EntityRef) next;
+                r = getResultsFor(ref);
+           // }
+
             if (r.size() > 0) {
                 currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
                 return currentIterator.hasNext();
@@ -90,13 +107,57 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
         try {
 
+
+            query.setLimit(Query.MAX_LIMIT); // always fetch our MAX limit to reduce # of IO hops
             if (query.getCollection() != null) {
 
+                // make sure this results in graph traversal
+                query.setQl("select *");
+
                 if(logger.isTraceEnabled()) {
                     logger.trace("Fetching with refType: {}, collection: {} with no query",
                         ref.getType(), query.getCollection());
                 }
-                return entityManager.searchCollection(ref, query.getCollection(), null);
+
+                // if we're fetching devices through groups->users->devices, get only the IDs and don't load the entities
+                if( ref.getType().equals(Group.ENTITY_TYPE)){
+
+                    // query users using IDs as we don't need to load the full entities just to find their devices
+                    Query usersQuery = new Query();
+                    usersQuery.setCollection("users");
+                    usersQuery.setResultsLevel(Query.Level.IDS);
+                    usersQuery.setLimit(1000);
+
+
+                    // set the query level for the iterator temporarily to IDS
+                    query.setResultsLevel(Query.Level.IDS);
+
+                 return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery);
+
+
+//                    List<EntityRef> refs =
+//                        results.getIds().stream()
+//                            .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList());
+//
+//                    // set the query level for the iterator back to REFS after mapping our IDS
+//                    query.setResultsLevel(Query.Level.REFS);
+//                    return Results.fromRefList(refs);
+
+                }
+
+                if( ref.getType().equals(User.ENTITY_TYPE)){
+
+                    Query devicesQuery = new Query();
+                    devicesQuery.setCollection("devices");
+                    devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+
+                    //query.setCollection("devices");
+                    //query.setResultsLevel(Query.Level.CORE_PROPERTIES);
+                    return entityManager.searchCollection(ref, devicesQuery.getCollection(), devicesQuery);
+                }
+
+                return entityManager.searchCollection(ref, query.getCollection(), query);
+
 
             } else {
 
@@ -105,7 +166,7 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
                         ref.getType(), query.getCollection());
                 }
 
-                query.setQl("select *");
+                query.setQl("select *"); // make sure this results in graph traversal
                 return entityManager.searchTargetEntities(ref, query);
 
             }
@@ -116,4 +177,14 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
         }
     }
 
+
+    private Results getResultsForId(UUID uuid, String type) {
+
+        EntityRef ref = new SimpleEntityRef(type, uuid);
+        return getResultsFor(ref);
+
+
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index e9a3251..2a84622 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -708,6 +708,22 @@ public class Results implements Iterable<Entity> {
         }
     }
 
+    public void addEntities( Results results){
+
+        if(entities == null){
+            //init();
+            entities = new ArrayList<>();
+            level = Level.CORE_PROPERTIES;
+        }
+
+        if( results.getEntities().size() > 0){
+
+            entities.addAll(results.getEntities());
+
+        }
+
+    }
+
 
     /** Remove the passed in results from the current results */
     public void subtract( Results results ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index aca10cf..d4f3529 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -409,20 +409,28 @@ public class Notification extends TypedEntity {
 
                     if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE) )){
 
-                        Query usersQuery = new Query();
+                        final Query usersQuery = new Query();
                         usersQuery.setQl("select *");
                         usersQuery.setCollection("users");
                         usersQuery.setLimit(100);
 
-                        Query devicesQuery = new Query();
+                        final Query devicesQuery = new Query();
                         devicesQuery.setQl("select *");
                         devicesQuery.setCollection("devices");
-                        usersQuery.setLimit(100);
+                        devicesQuery.setLimit(100);
 
 
                         // build up the chain so the proper iterators can be used later
-                        pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery );
+                        pathQuery = pathQuery.chain( usersQuery );//.chain( devicesQuery );
+
+                    }else if(pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(User.ENTITY_TYPE))){
+
+                        final Query devicesQuery = new Query();
+                        devicesQuery.setQl("select *");
+                        devicesQuery.setCollection("devices");
+                        devicesQuery.setLimit(100);
 
+                        pathQuery = pathQuery.chain( devicesQuery );
                     }
 
                 } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5ce1b93..1cbb2c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,10 +19,7 @@ package org.apache.usergrid.services.notifications.impl;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -108,6 +105,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
+            logger.info("notification {} start query", notification.getUuid());
+
 
 
             // the main iterator can use graph traversal or index querying
@@ -185,30 +184,80 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
             final Map<String, Object> filters = notification.getFilters();
 
-            Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
-                .flatMap(entity -> {
 
-                    if(entity.getType().equals(Device.ENTITY_TYPE)){
-                        return Observable.from(Collections.singletonList(entity));
-                    }
 
-                    // if it's not a device, drill down and get them
-                    return Observable.from(getDevices(entity));
+            Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator))
+//                .flatMap(entity -> {
+//
+//                    if(entity.getType().equals(Device.ENTITY_TYPE)){
+//                        return Observable.from(Collections.singletonList(entity));
+//                    }
+//
+//                    // if it's not a device, drill down and get them
+//                    return Observable.from(getDevices(entity));
+//
+//                })
+                .distinct()
+                .flatMap( entityRef -> {
+
+                    return Observable.just(entityRef).flatMap(ref->{
+
+                        List<Entity> entities = new ArrayList<>();
+
+                                Query devicesQuery = new Query();
+                                devicesQuery.setCollection("devices");
+                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+
+                                try {
+
+                                   entities = em.searchCollection(new SimpleEntityRef("user", ref), devicesQuery.getCollection(), devicesQuery).getEntities();
+
+                                }catch (Exception e){
+
+                                    logger.error("Unable to load devices for user: {}", ref);
+                                    return Observable.empty();
+                                }
 
-                })
-                .distinct(ref -> ref.getUuid() )
-                .map( entityRef -> entityRef.getUuid() )
-                .buffer(10)
-                .flatMap( uuids -> {
 
-                    if(logger.isTraceEnabled()) {
-                        logger.trace("Processing batch of {} device(s)", uuids.size());
-                    }
 
 
-                    return Observable.from(em.getEntities(uuids, "device"))
+//                            if( ref.getType().equals(User.ENTITY_TYPE)){
+//
+//                                Query devicesQuery = new Query();
+//                                devicesQuery.setCollection("devices");
+//                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+//
+//                                try {
+//
+//                                   entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
+//
+//                                }catch (Exception e){
+//
+//                                    logger.error("Unable to load devices for user: {}", ref.getUuid());
+//                                    return Observable.empty();
+//                                }
+//
+//
+//                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
+//
+//                                try{
+//                                    entities.add(em.get(ref));
+//
+//                                }catch(Exception e){
+//
+//                                    logger.error("Unable to load device: {}", ref.getUuid());
+//                                    return Observable.empty();
+//
+//                                }
+//
+//                            }
+                        return Observable.from(entities);
+
+                        })
                         .filter( device -> {
 
+                            logger.info("Filtering device: {}", device.getUuid());
+
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Filtering device: {}", device.getUuid());
                             }
@@ -233,7 +282,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                                 }
 
                                 if(logger.isTraceEnabled()) {
-                                    logger.trace("Push notification filter matched for notification {}, so removing from notification",
+                                    logger.trace("Push notification filter did not match for notification {}, so removing from notification",
                                         device.getUuid(), notification.getUuid());
                                 }
                                 return false;
@@ -271,7 +320,20 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
                         }).subscribeOn(Schedulers.io());
-                }, 10)
+
+                }, 100)
+                //.map( entityRef -> entityRef.getUuid() )
+                //.buffer(10)
+//                .flatMap( uuids -> {
+//
+//                    if(logger.isTraceEnabled()) {
+//                        logger.trace("Processing batch of {} device(s)", uuids.size());
+//                    }
+//
+//
+//                    return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io());
+//
+//                }, 10)
 
                 .doOnError(throwable -> {