You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/04/17 19:04:33 UTC

[1/2] usergrid git commit: Parallel device fetching from users, need to update to support all PN use cases with this parallelism.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 8cf782527 -> 06caa2509


Parallel device fetching from users, need to update to support all PN use cases with this parallelism.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cc3cbfee
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cc3cbfee
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cc3cbfee

Branch: refs/heads/release-2.1.1
Commit: cc3cbfee60f83e107269fe4fb558cb094b5c2032
Parents: 8cf7825
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Apr 17 14:34:45 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Apr 17 14:34:45 2016 +0100

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  21 ++++
 .../pipeline/builder/IdBuilder.java             |   6 ++
 .../pipeline/read/FilterFactory.java            |   8 +-
 .../pipeline/read/traverse/IdFilter.java        |  52 ++++++++++
 .../results/IdQueryExecutor.java                |  66 ++++++++++++
 .../service/CollectionSearch.java               |   9 ++
 .../service/CollectionService.java              |   5 +
 .../service/CollectionServiceImpl.java          |  23 ++++
 .../persistence/NotificationGraphIterator.java  |  79 +++++++++++++-
 .../apache/usergrid/persistence/Results.java    |  16 +++
 .../persistence/entities/Notification.java      |  16 ++-
 .../impl/ApplicationQueueManagerImpl.java       | 104 +++++++++++++++----
 12 files changed, 369 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 9ecf466..5596ab4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.corepersistence;
 
 import java.util.*;
 
+import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -614,6 +615,24 @@ public class CpRelationManager implements RelationManager {
         final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
         final Id ownerId = headEntity.asId();
 
+
+        if(query.getLevel() == Level.IDS ){
+
+            return new IdQueryExecutor( toExecute.getCursor() ) {
+                @Override
+                protected Observable<ResultsPage<Id>> buildNewResultsPage(
+                    final Optional<String> cursor ) {
+
+                    final CollectionSearch search =
+                        new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
+                            queryString, cursor );
+
+                    return collectionService.searchCollectionIds( search );
+                }
+            }.next();
+
+        }
+
         //wire the callback so we can get each page
         return new EntityQueryExecutor( toExecute.getCursor() ) {
             @Override
@@ -989,6 +1008,8 @@ public class CpRelationManager implements RelationManager {
         //            }
         //        }
 
+
+
         return query;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 0f784a6..65cf7c1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe
 import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -148,4 +149,9 @@ public class IdBuilder {
         return new ConnectionRefBuilder(connectionRefFilter);
     }
 
+    public Observable<ResultsPage<Id>> build(){
+        //we must add our resume filter so we drop our previous page first element if it's present
+        return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index ca5695c..883fdc8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -27,13 +27,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityF
 import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.*;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
new file mode 100644
index 0000000..9b3a05a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+import java.util.List;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
+ * be removed
+ */
+public class IdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{
+
+    @Inject
+    public IdFilter() {};
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+        return filterValueObservable.map( idFilterResult ->  new FilterResult( idFilterResult.getValue(), idFilterResult.getPath() ));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
new file mode 100644
index 0000000..5a0bb3f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of connection refs
+ */
+@Deprecated//Required for 1.0 compatibility
+public abstract class IdQueryExecutor extends ObservableQueryExecutor<Id> {
+
+
+    protected IdQueryExecutor( final Optional<String> startCursor ) {
+        super( startCursor );
+    }
+
+
+    @Override
+    protected Results createResults( final ResultsPage resultsPage ) {
+        final List<Id> ids = resultsPage.getEntityList();
+
+        List<UUID> uuids = ids.stream().map(id -> id.getUuid()).collect(Collectors.toList());
+
+        final Results results = Results.fromIdList(uuids);
+
+        return results;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
index ab8a8bc..602a5b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -29,6 +29,10 @@ import com.google.common.base.Optional;
  */
 public class CollectionSearch {
 
+    public enum Level {
+        IDS, ALL
+    }
+
     private final ApplicationScope applicationScope;
     private final Id collectionOwnerId;
     private final String collectionName;
@@ -36,6 +40,7 @@ public class CollectionSearch {
     private final int limit;
     private final Optional<String> query;
     private final Optional<String> cursor;
+    private Level level = Level.ALL;
 
 
     public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String
@@ -84,4 +89,8 @@ public class CollectionSearch {
     public Id getCollectionOwnerId() {
         return collectionOwnerId;
     }
+
+    public void setResultsLevel(Level level){ this.level = level; }
+
+    public Level getResultsLevel(){ return level; }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
index eef741a..6a46022 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.service;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
@@ -35,4 +36,8 @@ public interface CollectionService {
      * @return An observable with results page entries for the stream
      */
     Observable<ResultsPage<Entity>> searchCollection(final CollectionSearch search);
+
+
+
+    Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
index fa79d09..9244315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -29,6 +29,7 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
@@ -73,4 +74,26 @@ public class CollectionServiceImpl implements CollectionService {
 
         return results.build();
     }
+
+    @Override
+    public Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search ) {
+
+
+        final ApplicationScope applicationScope = search.getApplicationScope();
+        final String collectionName = search.getCollectionName();
+        final Optional<String> query = search.getQuery();
+
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
+                .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+
+
+        final IdBuilder results;
+
+
+        results = pipelineBuilder.traverseCollection( collectionName );
+
+
+        return results.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
index b83f555..a1f3246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -17,10 +17,16 @@
 package org.apache.usergrid.persistence;
 
 
+import org.apache.usergrid.persistence.entities.Group;
+import org.apache.usergrid.persistence.entities.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
@@ -58,8 +64,19 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
             return true;
         }
         while (source.hasNext()) {
-            EntityRef ref = source.next();
-            Results r = getResultsFor(ref);
+            Object next = source.next();
+            Results r;
+
+//            if(next instanceof UUID){
+//
+//                UUID id = (UUID) next;
+//                r = getResultsForId(id, "user");
+//
+//            }else {
+                EntityRef ref = (EntityRef) next;
+                r = getResultsFor(ref);
+           // }
+
             if (r.size() > 0) {
                 currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
                 return currentIterator.hasNext();
@@ -90,13 +107,57 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
         try {
 
+
+            query.setLimit(Query.MAX_LIMIT); // always fetch our MAX limit to reduce # of IO hops
             if (query.getCollection() != null) {
 
+                // make sure this results in graph traversal
+                query.setQl("select *");
+
                 if(logger.isTraceEnabled()) {
                     logger.trace("Fetching with refType: {}, collection: {} with no query",
                         ref.getType(), query.getCollection());
                 }
-                return entityManager.searchCollection(ref, query.getCollection(), null);
+
+                // if we're fetching devices through groups->users->devices, get only the IDs and don't load the entities
+                if( ref.getType().equals(Group.ENTITY_TYPE)){
+
+                    // query users using IDs as we don't need to load the full entities just to find their devices
+                    Query usersQuery = new Query();
+                    usersQuery.setCollection("users");
+                    usersQuery.setResultsLevel(Query.Level.IDS);
+                    usersQuery.setLimit(1000);
+
+
+                    // set the query level for the iterator temporarily to IDS
+                    query.setResultsLevel(Query.Level.IDS);
+
+                 return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery);
+
+
+//                    List<EntityRef> refs =
+//                        results.getIds().stream()
+//                            .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList());
+//
+//                    // set the query level for the iterator back to REFS after mapping our IDS
+//                    query.setResultsLevel(Query.Level.REFS);
+//                    return Results.fromRefList(refs);
+
+                }
+
+                if( ref.getType().equals(User.ENTITY_TYPE)){
+
+                    Query devicesQuery = new Query();
+                    devicesQuery.setCollection("devices");
+                    devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+
+                    //query.setCollection("devices");
+                    //query.setResultsLevel(Query.Level.CORE_PROPERTIES);
+                    return entityManager.searchCollection(ref, devicesQuery.getCollection(), devicesQuery);
+                }
+
+                return entityManager.searchCollection(ref, query.getCollection(), query);
+
 
             } else {
 
@@ -105,7 +166,7 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
                         ref.getType(), query.getCollection());
                 }
 
-                query.setQl("select *");
+                query.setQl("select *"); // make sure this results in graph traversal
                 return entityManager.searchTargetEntities(ref, query);
 
             }
@@ -116,4 +177,14 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
         }
     }
 
+
+    private Results getResultsForId(UUID uuid, String type) {
+
+        EntityRef ref = new SimpleEntityRef(type, uuid);
+        return getResultsFor(ref);
+
+
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index e9a3251..2a84622 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -708,6 +708,22 @@ public class Results implements Iterable<Entity> {
         }
     }
 
+    public void addEntities( Results results){
+
+        if(entities == null){
+            //init();
+            entities = new ArrayList<>();
+            level = Level.CORE_PROPERTIES;
+        }
+
+        if( results.getEntities().size() > 0){
+
+            entities.addAll(results.getEntities());
+
+        }
+
+    }
+
 
     /** Remove the passed in results from the current results */
     public void subtract( Results results ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index aca10cf..d4f3529 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -409,20 +409,28 @@ public class Notification extends TypedEntity {
 
                     if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE) )){
 
-                        Query usersQuery = new Query();
+                        final Query usersQuery = new Query();
                         usersQuery.setQl("select *");
                         usersQuery.setCollection("users");
                         usersQuery.setLimit(100);
 
-                        Query devicesQuery = new Query();
+                        final Query devicesQuery = new Query();
                         devicesQuery.setQl("select *");
                         devicesQuery.setCollection("devices");
-                        usersQuery.setLimit(100);
+                        devicesQuery.setLimit(100);
 
 
                         // build up the chain so the proper iterators can be used later
-                        pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery );
+                        pathQuery = pathQuery.chain( usersQuery );//.chain( devicesQuery );
+
+                    }else if(pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(User.ENTITY_TYPE))){
+
+                        final Query devicesQuery = new Query();
+                        devicesQuery.setQl("select *");
+                        devicesQuery.setCollection("devices");
+                        devicesQuery.setLimit(100);
 
+                        pathQuery = pathQuery.chain( devicesQuery );
                     }
 
                 } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5ce1b93..1cbb2c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,10 +19,7 @@ package org.apache.usergrid.services.notifications.impl;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -108,6 +105,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
+            logger.info("notification {} start query", notification.getUuid());
+
 
 
             // the main iterator can use graph traversal or index querying
@@ -185,30 +184,80 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
             final Map<String, Object> filters = notification.getFilters();
 
-            Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
-                .flatMap(entity -> {
 
-                    if(entity.getType().equals(Device.ENTITY_TYPE)){
-                        return Observable.from(Collections.singletonList(entity));
-                    }
 
-                    // if it's not a device, drill down and get them
-                    return Observable.from(getDevices(entity));
+            Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator))
+//                .flatMap(entity -> {
+//
+//                    if(entity.getType().equals(Device.ENTITY_TYPE)){
+//                        return Observable.from(Collections.singletonList(entity));
+//                    }
+//
+//                    // if it's not a device, drill down and get them
+//                    return Observable.from(getDevices(entity));
+//
+//                })
+                .distinct()
+                .flatMap( entityRef -> {
+
+                    return Observable.just(entityRef).flatMap(ref->{
+
+                        List<Entity> entities = new ArrayList<>();
+
+                                Query devicesQuery = new Query();
+                                devicesQuery.setCollection("devices");
+                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+
+                                try {
+
+                                   entities = em.searchCollection(new SimpleEntityRef("user", ref), devicesQuery.getCollection(), devicesQuery).getEntities();
+
+                                }catch (Exception e){
+
+                                    logger.error("Unable to load devices for user: {}", ref);
+                                    return Observable.empty();
+                                }
 
-                })
-                .distinct(ref -> ref.getUuid() )
-                .map( entityRef -> entityRef.getUuid() )
-                .buffer(10)
-                .flatMap( uuids -> {
 
-                    if(logger.isTraceEnabled()) {
-                        logger.trace("Processing batch of {} device(s)", uuids.size());
-                    }
 
 
-                    return Observable.from(em.getEntities(uuids, "device"))
+//                            if( ref.getType().equals(User.ENTITY_TYPE)){
+//
+//                                Query devicesQuery = new Query();
+//                                devicesQuery.setCollection("devices");
+//                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+//
+//                                try {
+//
+//                                   entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
+//
+//                                }catch (Exception e){
+//
+//                                    logger.error("Unable to load devices for user: {}", ref.getUuid());
+//                                    return Observable.empty();
+//                                }
+//
+//
+//                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
+//
+//                                try{
+//                                    entities.add(em.get(ref));
+//
+//                                }catch(Exception e){
+//
+//                                    logger.error("Unable to load device: {}", ref.getUuid());
+//                                    return Observable.empty();
+//
+//                                }
+//
+//                            }
+                        return Observable.from(entities);
+
+                        })
                         .filter( device -> {
 
+                            logger.info("Filtering device: {}", device.getUuid());
+
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Filtering device: {}", device.getUuid());
                             }
@@ -233,7 +282,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                                 }
 
                                 if(logger.isTraceEnabled()) {
-                                    logger.trace("Push notification filter matched for notification {}, so removing from notification",
+                                    logger.trace("Push notification filter did not match for notification {}, so removing from notification",
                                         device.getUuid(), notification.getUuid());
                                 }
                                 return false;
@@ -271,7 +320,20 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
                         }).subscribeOn(Schedulers.io());
-                }, 10)
+
+                }, 100)
+                //.map( entityRef -> entityRef.getUuid() )
+                //.buffer(10)
+//                .flatMap( uuids -> {
+//
+//                    if(logger.isTraceEnabled()) {
+//                        logger.trace("Processing batch of {} device(s)", uuids.size());
+//                    }
+//
+//
+//                    return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io());
+//
+//                }, 10)
 
                 .doOnError(throwable -> {
 


[2/2] usergrid git commit: Final changes to enhance parallel loading of devices for push notifications.

Posted by mr...@apache.org.
Final changes to enhance parallel loading of devices for push notifications.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06caa250
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06caa250
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06caa250

Branch: refs/heads/release-2.1.1
Commit: 06caa2509407322498c025b1b3d39135d82777cc
Parents: cc3cbfe
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Apr 17 18:02:32 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Apr 17 18:02:32 2016 +0100

----------------------------------------------------------------------
 .../pipeline/builder/IdBuilder.java             |   2 +-
 .../persistence/MultiQueryIterator.java         |   2 +-
 .../persistence/NotificationGraphIterator.java  |  59 ++----
 .../persistence/PagingResultsIterator.java      |  25 ++-
 .../apache/usergrid/persistence/PathQuery.java  |  14 +-
 .../apache/usergrid/persistence/Results.java    |   4 +
 .../impl/ApplicationQueueManagerImpl.java       | 211 +++++--------------
 7 files changed, 102 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 65cf7c1..781d7d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -151,7 +151,7 @@ public class IdBuilder {
 
     public Observable<ResultsPage<Id>> build(){
         //we must add our resume filter so we drop our previous page first element if it's present
-        return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute();
+        return pipeline.withFilter( new IdResumeFilter() ).withFilter(new ResultsPageCollector<>()).execute();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
index c5de5c1..9e28204 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
@@ -62,7 +62,7 @@ public class MultiQueryIterator implements ResultsIterator {
             EntityRef ref = source.next();
             Results r = getResultsFor( ref );
             if ( r.size() > 0 ) {
-                currentIterator = new PagingResultsIterator( r, query.getResultsLevel() );
+                currentIterator = new PagingResultsIterator( r, query.getResultsLevel(), null);
                 return currentIterator.hasNext();
             }
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
index a1f3246..a1b162d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -23,10 +23,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
 
 public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
@@ -67,18 +63,20 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
             Object next = source.next();
             Results r;
 
-//            if(next instanceof UUID){
-//
-//                UUID id = (UUID) next;
-//                r = getResultsForId(id, "user");
-//
-//            }else {
-                EntityRef ref = (EntityRef) next;
-                r = getResultsFor(ref);
-           // }
+            EntityRef ref = (EntityRef) next;
+            r = getResultsFor(ref);
 
             if (r.size() > 0) {
-                currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
+
+
+                if(ref.getType().equals(Group.ENTITY_TYPE)) {
+
+                    currentIterator = new PagingResultsIterator(r, query.getResultsLevel(), Query.Level.REFS);
+                }else{
+                    currentIterator = new PagingResultsIterator(r, query.getResultsLevel(), null);
+
+                }
+
                 return currentIterator.hasNext();
             }
         }
@@ -122,26 +120,13 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
                 // if we're fetching devices through groups->users->devices, get only the IDs and don't load the entities
                 if( ref.getType().equals(Group.ENTITY_TYPE)){
 
-                    // query users using IDs as we don't need to load the full entities just to find their devices
-                    Query usersQuery = new Query();
-                    usersQuery.setCollection("users");
-                    usersQuery.setResultsLevel(Query.Level.IDS);
-                    usersQuery.setLimit(1000);
+                    // groups->users is a passthrough to devices, load our max limit
+                    query.setLimit(Query.MAX_LIMIT);
 
-
-                    // set the query level for the iterator temporarily to IDS
+                    // set the query level for the when fetching users to IDS, we don't need the full entity
                     query.setResultsLevel(Query.Level.IDS);
 
-                 return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery);
-
-
-//                    List<EntityRef> refs =
-//                        results.getIds().stream()
-//                            .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList());
-//
-//                    // set the query level for the iterator back to REFS after mapping our IDS
-//                    query.setResultsLevel(Query.Level.REFS);
-//                    return Results.fromRefList(refs);
+                 return entityManager.searchCollection(ref, "users", query);
 
                 }
 
@@ -151,8 +136,6 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
                     devicesQuery.setCollection("devices");
                     devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
 
-                    //query.setCollection("devices");
-                    //query.setResultsLevel(Query.Level.CORE_PROPERTIES);
                     return entityManager.searchCollection(ref, devicesQuery.getCollection(), devicesQuery);
                 }
 
@@ -177,14 +160,4 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
         }
     }
 
-
-    private Results getResultsForId(UUID uuid, String type) {
-
-        EntityRef ref = new SimpleEntityRef(type, uuid);
-        return getResultsFor(ref);
-
-
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
index a883e1b..640ee06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
@@ -19,6 +19,8 @@ package org.apache.usergrid.persistence;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.usergrid.persistence.Query.Level;
 
 
@@ -28,20 +30,23 @@ public class PagingResultsIterator implements ResultsIterator, Iterable {
     private Results results;
     private Iterator currentPageIterator;
     private Level level;
+    private Level overrideLevel;
 
 
     public PagingResultsIterator( Results results ) {
-        this( results, results.level );
+        this( results, results.level, null);
     }
 
 
     /**
      * @param level overrides the default level from the Results - in case you want to return, say, UUIDs where the
      * Query was set for Entities
+     * @param overrideLevel
      */
-    public PagingResultsIterator( Results results, Level level ) {
+    public PagingResultsIterator(Results results, Level level, Level overrideLevel) {
         this.results = results;
         this.level = level;
+        this.overrideLevel = overrideLevel;
         initCurrentPageIterator();
     }
 
@@ -86,16 +91,32 @@ public class PagingResultsIterator implements ResultsIterator, Iterable {
      */
     private boolean initCurrentPageIterator() {
         List currentPage;
+        Level origLevel = level;
+        if(overrideLevel != null){
+            level=overrideLevel;
+            if(results.getIds()!=null){
+
+                List<EntityRef> userRefs = results.getIds().stream()
+                    .map( uuid -> new SimpleEntityRef("user", uuid)).collect(Collectors.toList());
+
+                results.setRefs(userRefs);
+
+            }
+        }
+
         if ( results != null ) {
             switch ( level ) {
                 case IDS:
                     currentPage = results.getIds();
+                    level = origLevel;
                     break;
                 case REFS:
                     currentPage = results.getRefs();
+                    level = origLevel;
                     break;
                 default:
                     currentPage = results.getEntities();
+                    level = origLevel;
             }
             if ( currentPage.size() > 0 ) {
                 currentPageIterator = currentPage.iterator();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index 215f6ac..30636ab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -88,7 +88,7 @@ public class PathQuery<E> {
         try {
 
             if ( uuid != null && type != null ) {
-                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
+                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(), null);
             }
             else {
                 return new MultiQueryIterator( em, source.refIterator( em, false), query );
@@ -103,7 +103,7 @@ public class PathQuery<E> {
         try {
 
             if ( uuid != null && type != null ) {
-                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
+                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(), null);
             }else {
 
                 return new NotificationGraphIterator(em, source.refIterator(em, true), query);
@@ -130,6 +130,12 @@ public class PathQuery<E> {
 
             UUID entityId = em.getUniqueIdFromAlias( entityType, name );
 
+            if( entityId == null){
+                throw new
+                    IllegalArgumentException("Entity with name "+name+" not found. Unable to send push notification");
+            }
+
+
             return em.getEntities(Collections.singletonList(entityId), entityType);
         }
 
@@ -143,12 +149,12 @@ public class PathQuery<E> {
 
         if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){
 
-            return new PagingResultsIterator( getHeadResults( em ), Level.REFS );
+            return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null);
 
         }
 
         if ( type != null  && uuid != null) {
-            return new PagingResultsIterator( getHeadResults( em ), Level.REFS );
+            return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null);
         }
         else {
             Query q = query;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 2a84622..3502581 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -436,6 +436,10 @@ public class Results implements Iterable<Entity> {
         level = Level.REFS;
     }
 
+    public void setRefsOnly( List<EntityRef> resultsRefs ) {
+        refs = resultsRefs;
+    }
+
 
     public Results withRefs( List<EntityRef> resultsRefs ) {
         setRefs( resultsRefs );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 1cbb2c6..2f39ae4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -52,6 +52,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
     private final Meter queueMeter;
     private final Meter sendMeter;
 
+    private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency";
+
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
 
 
@@ -91,25 +93,22 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             return;
         }
 
-        if (logger.isTraceEnabled()) {
-            logger.trace("notification {} start queuing", notification.getUuid());
-        }
-
         final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues
 
-        //get devices in querystring, and make sure you have access
+        // Get devices in querystring, and make sure you have access
         if (pathQuery != null) {
             final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
-            logger.info("notification {} start query", notification.getUuid());
+
+            logger.info("Notification {} started processing", notification.getUuid());
 
 
 
-            // the main iterator can use graph traversal or index querying
+            // The main iterator can use graph traversal or index querying based on payload property. Default is Index.
             final Iterator<Device> iterator;
             if( notification.getUseGraph()){
                 iterator = pathQuery.graphIterator(em);
@@ -117,15 +116,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 iterator = pathQuery.iterator(em);
             }
 
-//            //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
-//            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
-//                if(logger.isTraceEnabled()){
-//                    logger.trace("Scheduling notification job as it has multiple pages of devices.");
-//                }
-//                jobScheduler.scheduleQueueJob(notification, true);
-//                em.update(notification);
-//                return;
-//            }
+            /**** Old code to scheduler large sets of data, but now the processing is fired off async in the background.
+                Leaving this only a reference of how to do it, if needed in future.
+
+                    //if there are more pages (defined by PAGE_SIZE) you probably want this to be async,
+                    //also if this is already a job then don't reschedule
+
+                    if (iterator instanceof ResultsIterator
+                                && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+
+                        if(logger.isTraceEnabled()){
+                            logger.trace("Scheduling notification job as it has multiple pages of devices.");
+                        }
+                        jobScheduler.scheduleQueueJob(notification, true);
+                        em.update(notification);
+                        return;
+                     }
+             ****/
+
             final UUID appId = em.getApplication().getUuid();
             final Map<String, Object> payloads = notification.getPayloads();
 
@@ -182,87 +190,57 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
             };
 
-            final Map<String, Object> filters = notification.getFilters();
 
+            final Map<String, Object> filters = notification.getFilters();
 
+            Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator))
 
-            Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator))
-//                .flatMap(entity -> {
-//
-//                    if(entity.getType().equals(Device.ENTITY_TYPE)){
-//                        return Observable.from(Collections.singletonList(entity));
-//                    }
-//
-//                    // if it's not a device, drill down and get them
-//                    return Observable.from(getDevices(entity));
-//
-//                })
-                .distinct()
                 .flatMap( entityRef -> {
 
                     return Observable.just(entityRef).flatMap(ref->{
 
                         List<Entity> entities = new ArrayList<>();
 
+                            if( ref.getType().equals(User.ENTITY_TYPE)){
+
                                 Query devicesQuery = new Query();
                                 devicesQuery.setCollection("devices");
                                 devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
 
                                 try {
 
-                                   entities = em.searchCollection(new SimpleEntityRef("user", ref), devicesQuery.getCollection(), devicesQuery).getEntities();
+                                   entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
 
                                 }catch (Exception e){
 
-                                    logger.error("Unable to load devices for user: {}", ref);
+                                    logger.error("Unable to load devices for user: {}", ref.getUuid());
                                     return Observable.empty();
                                 }
 
 
+                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
 
+                                try{
+                                    entities.add(em.get(ref));
 
-//                            if( ref.getType().equals(User.ENTITY_TYPE)){
-//
-//                                Query devicesQuery = new Query();
-//                                devicesQuery.setCollection("devices");
-//                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
-//
-//                                try {
-//
-//                                   entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
-//
-//                                }catch (Exception e){
-//
-//                                    logger.error("Unable to load devices for user: {}", ref.getUuid());
-//                                    return Observable.empty();
-//                                }
-//
-//
-//                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
-//
-//                                try{
-//                                    entities.add(em.get(ref));
-//
-//                                }catch(Exception e){
-//
-//                                    logger.error("Unable to load device: {}", ref.getUuid());
-//                                    return Observable.empty();
-//
-//                                }
-//
-//                            }
+                                }catch(Exception e){
+
+                                    logger.error("Unable to load device: {}", ref.getUuid());
+                                    return Observable.empty();
+
+                                }
+
+                            }
                         return Observable.from(entities);
 
                         })
+                        .distinct( deviceRef -> deviceRef.getUuid())
                         .filter( device -> {
 
-                            logger.info("Filtering device: {}", device.getUuid());
-
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Filtering device: {}", device.getUuid());
                             }
 
-
                             if(notification.getUseGraph() && filters.size() > 0 ) {
 
                                 for (Map.Entry<String, Object> entry : filters.entrySet()) {
@@ -280,7 +258,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                                     }
 
                                 }
-
                                 if(logger.isTraceEnabled()) {
                                     logger.trace("Push notification filter did not match for notification {}, so removing from notification",
                                         device.getUuid(), notification.getUuid());
@@ -321,20 +298,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                         }).subscribeOn(Schedulers.io());
 
-                }, 100)
-                //.map( entityRef -> entityRef.getUuid() )
-                //.buffer(10)
-//                .flatMap( uuids -> {
-//
-//                    if(logger.isTraceEnabled()) {
-//                        logger.trace("Processing batch of {} device(s)", uuids.size());
-//                    }
-//
-//
-//                    return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io());
-//
-//                }, 10)
-
+                }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")))
                 .doOnError(throwable -> {
 
                     logger.error("Error while processing devices for notification : {}", notification.getUuid());
@@ -355,7 +319,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                         notification.setProcessingFinished(System.currentTimeMillis());
                         notification.setDeviceProcessedCount(deviceCount.get());
                         em.update(notification);
-                        logger.info("{} device(s) processed for notification {}", deviceCount.get(), notification.getUuid());
+                        logger.info("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get());
 
                     } catch (Exception e) {
                         logger.error("Unable to set processing finished timestamp for notification");
@@ -622,10 +586,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             try {
                 while (!subscriber.isUnsubscribed() && input.hasNext()) {
                     //send our input to the next
+                    //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName());
                     subscriber.onNext((T) input.next());
                 }
 
                 //tell the subscriber we don't have any more data
+                //logger.debug("finished iterator: {}", input.getClass().getSimpleName());
+
                 subscriber.onCompleted();
             } catch (Throwable t) {
                 logger.error("failed on subscriber", t);
@@ -678,90 +645,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
         return true;
     }
 
-    private List<EntityRef> getDevices(EntityRef ref) {
-
-        List<EntityRef> devices = new ArrayList<>();
-
-        final int LIMIT = Query.MID_LIMIT;
-
-        try {
-
-           if (User.ENTITY_TYPE.equals(ref.getType())) {
-
-                UUID start = null;
-                boolean initial = true;
-                int resultSize = 0;
-                while( initial || resultSize >= Query.DEFAULT_LIMIT) {
-
-                    initial = false;
-
-                    final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, LIMIT,
-                        Query.Level.REFS, true).getRefs();
-
-                    resultSize = mydevices.size();
-
-                    if(mydevices.size() > 0){
-                        start = mydevices.get(mydevices.size() - 1 ).getUuid();
-                    }
-
-                    devices.addAll( mydevices  );
-
-                }
-
-            } else if (Group.ENTITY_TYPE.equals(ref.getType())) {
-
-                UUID start = null;
-                boolean initial = true;
-                int resultSize = 0;
-
-                while( initial || resultSize >= LIMIT){
-
-                    initial = false;
-
-                    final List<EntityRef> myusers =  em.getCollection(ref, "users", start,
-                        LIMIT, Query.Level.REFS, true).getRefs();
-                    resultSize = myusers.size();
-
-                    if(myusers.size() > 0){
-                        start = myusers.get(myusers.size() - 1 ).getUuid();
-                    }
-
-
-                    Observable.from(myusers).flatMap( user -> {
-
-                        try {
-                            devices.addAll(em.getCollection(user, "devices", null, 100,
-                                Query.Level.REFS, true).getRefs());
-                        }catch (Exception e){
-                            logger.error ("Unable to fetch devices for user: {}", user.getUuid());
-                        }
-                        return Observable.from(Collections.singletonList(user));
-
-                    }, 50).toBlocking().lastOrDefault(null);
-
-
-
-
-
-                }
-
-            }
-        } catch (Exception e) {
-
-            if (ref != null){
-                logger.error("Error while retrieving devices for entity type {} and uuid {}. Error: {}",
-                    ref.getType(), ref.getUuid(), e);
-            }else{
-                logger.error("Error while retrieving devices. Entity ref was null.");
-            }
-
-            throw new RuntimeException("Unable to retrieve devices for EntityRef", e);
-
-        }
-
-        return devices;
-    }
-
 
     private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
         try {