You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/25 23:46:51 UTC

incubator-usergrid git commit: Refactored search to encapsulate repair and load logic

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-511 [created] c7fa864f7


Refactored search to encapsulate repair and load logic

Loaders and verifiers are separate for connections and collections

Fixes broken test in GraphManager if writes occur in the same millisecond

Reduces fork count to 1 and uses parallel threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c7fa864f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c7fa864f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c7fa864f

Branch: refs/heads/USERGRID-511
Commit: c7fa864f73e742c8dd37071ea57d5b4e2ec31422
Parents: 5937f9f
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 25 12:38:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 25 16:45:31 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 316 ++++++-------------
 .../results/CollectionRefsVerifier.java         |  44 +++
 .../CollectionResultsLoaderFactoryImpl.java     |  60 ++++
 .../results/ConnectionRefsVerifier.java         |  61 ++++
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 ++++
 .../results/ElasticSearchQueryExecutor.java     | 211 +++++++++++++
 .../corepersistence/results/QueryExecutor.java  |  37 +++
 .../corepersistence/results/RefsVerifier.java   |  42 ---
 .../results/ResultsLoaderFactory.java           |   3 +-
 .../results/ResultsLoaderFactoryImpl.java       |  62 ----
 .../persistence/MultiQueryIterator.java         |   6 +-
 .../apache/usergrid/persistence/Results.java    |  26 +-
 .../cassandra/QueryProcessorImpl.java           |  12 +-
 .../usergrid/persistence/PathQueryIT.java       |  46 ++-
 stack/core/src/test/resources/log4j.properties  |   2 +
 .../persistence/graph/GraphManagerIT.java       |   9 +-
 .../usergrid/persistence/index/IndexFig.java    |  16 +-
 .../index/impl/BufferQueueInMemoryImpl.java     |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  61 ++--
 .../usergrid/persistence/index/query/Query.java |  58 ++--
 .../persistence/index/utils/ListUtils.java      |   2 -
 stack/pom.xml                                   |   2 +-
 22 files changed, 729 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2eeee28..7179baf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -29,14 +29,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.results.ResultsLoader;
-import org.apache.usergrid.corepersistence.results.ResultsLoaderFactory;
-import org.apache.usergrid.corepersistence.results.ResultsLoaderFactoryImpl;
+import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
+import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
+import org.apache.usergrid.corepersistence.results.QueryExecutor;
+import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.ConnectedEntityRef;
@@ -61,6 +61,7 @@ import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
@@ -80,8 +81,6 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -115,6 +114,7 @@ import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.mutation.Mutator;
 import rx.Observable;
 import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 
 import static java.util.Arrays.asList;
@@ -186,8 +186,6 @@ public class CpRelationManager implements RelationManager {
 
     private IndexBucketLocator indexBucketLocator;
 
-    private ResultsLoaderFactory resultsLoaderFactory;
-
     private MetricsFactory metricsFactory;
     private Timer updateCollectionTimer;
     private Timer createConnectionTimer;
@@ -258,7 +256,6 @@ public class CpRelationManager implements RelationManager {
         // commented out because it is possible that CP entity has not been created yet
         Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" );
 
-        this.resultsLoaderFactory = new ResultsLoaderFactoryImpl( managerCache );
 
         return this;
     }
@@ -278,7 +275,7 @@ public class CpRelationManager implements RelationManager {
         });
 
         Observable<String> types= gm.getEdgeTypesFromSource(
-            new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
+            new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
 
         Iterator<String> iter = types.toBlockingObservable().getIterator();
         while ( iter.hasNext() ) {
@@ -316,63 +313,56 @@ public class CpRelationManager implements RelationManager {
 
     /**
      * Gets containing collections and/or connections depending on the edge type you pass in
+     *
      * @param limit Max number to return
      * @param edgeType Edge type, edge type prefix or null to allow any edge type
      * @param fromEntityType Only consider edges from entities of this type
      */
-    Map<EntityRef, Set<String>> getContainers( int limit, String edgeType, String fromEntityType ) {
+    Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
 
         Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-
-        Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
-            cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
-
-        logger.debug("getContainers(): "
-                + "Searched for edges of type {}\n   to target {}:{}\n   in scope {}\n   found: {}",
-            new Object[] {
-                edgeType,
-                cpHeadEntity.getId().getType(),
-                cpHeadEntity.getId().getUuid(),
-                applicationScope.getApplication(),
-                edgeTypes.hasNext()
-        });
+        final GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        while ( edgeTypes.hasNext() ) {
+        Observable<Edge> edges =
+            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
+              .flatMap( new Func1<String, Observable<Edge>>() {
+                  @Override
+                  public Observable<Edge> call( final String edgeType ) {
+                      return gm.loadEdgesToTarget(
+                          new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
+                              SearchByEdgeType.Order.DESCENDING, null ) );
 
-            String etype = edgeTypes.next();
+                  }
+              } );
 
-            Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-                cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+        //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
+        if ( limit > -1 ) {
+            edges = edges.take( limit );
+        }
 
-            Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
-            while ( iter.hasNext() ) {
-                Edge edge = iter.next();
 
-                if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
-                    logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
-                    continue;
+        return edges.collect( results, new Action2<Map<EntityRef, Set<String>>, Edge>() {
+            @Override
+            public void call( final Map<EntityRef, Set<String>> entityRefSetMap, final Edge edge ) {
+                if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+                    logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
+                    return;
                 }
 
-                EntityRef eref = new SimpleEntityRef(
-                        edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                final EntityRef eref =
+                    new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                String name = null;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+                String name;
+                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
                     name = CpNamingUtils.getConnectionType( edge.getType() );
-                } else {
+                }
+                else {
                     name = CpNamingUtils.getCollectionName( edge.getType() );
                 }
-                addMapSet( results, eref, name );
-            }
-
-            if ( limit > 0 && results.keySet().size() >= limit ) {
-                break;
+                addMapSet( entityRefSetMap, eref, name );
             }
-        }
-
-        return results;
+        } ).toBlocking().last();
     }
 
 
@@ -660,9 +650,6 @@ public class CpRelationManager implements RelationManager {
         CollectionScope memberScope = getCollectionScopeNameFromEntityType(
                 applicationScope.getApplication(), itemRef.getType());
 
-        //TODO, this double load should disappear once events are in
-        Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
-
         if ( memberEntity == null ) {
             throw new RuntimeException(
                     "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
@@ -694,34 +681,34 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager( applicationScope );
         gm.writeEdge( edge ).toBlockingObservable().last();
 
-        logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}",
-            new Object[] {
-                edgeType,
-                cpHeadEntity.getId().getType(),
-                cpHeadEntity.getId().getUuid(),
-                memberEntity.getId().getType(),
-                memberEntity.getId().getUuid(),
-                applicationScope.getApplication().getType(),
+
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] {
+                edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(),
+                memberEntity.getId().getUuid(), applicationScope.getApplication().getType(),
                 applicationScope.getApplication().getUuid()
-        } );
+            } );
+        }
 
         ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
 
-        logger.debug( "Added entity {}:{} to collection {}", new Object[] {
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Added entity {}:{} to collection {}", new Object[] {
                 itemRef.getUuid().toString(), itemRef.getType(), collName
-        } );
-
+            } );
+        }
         //        logger.debug("With head entity scope is {}:{}:{}", new Object[] {
         //            headEntityScope.getApplication().toString(),
         //            headEntityScope.getOwner().toString(),
         //            headEntityScope.getName()});
 
-        if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
-            getRelationManager( itemEntity ).addToCollection(
-                    collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
-            getRelationManager( itemEntity ).addToCollection(
-                    collection.getLinkedCollection(), headEntity, false );
-        }
+        //TODO T.N.  This should even be neccessary any longer, graph maintains 2 edges.  .
+//        if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
+//            getRelationManager( itemEntity ).addToCollection(
+//                    collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
+//            getRelationManager( itemEntity ).addToCollection(
+//                    collection.getLinkedCollection(), headEntity, false );
+//        }
 
         return itemEntity;
     }
@@ -951,65 +938,19 @@ public class CpRelationManager implements RelationManager {
 
         logger.debug( "Searching scope {}:{}",
 
-                indexScope.getOwner().toString(), indexScope.getName() );
+            indexScope.getOwner().toString(), indexScope.getName() );
 
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
 
-        // Because of possible stale entities, which are filtered out by buildResults(),
-        // we loop until the we've got enough results to satisfy the query limit.
 
-        int maxQueries = 10; // max re-queries to satisfy query limit
-
-        final int originalLimit = query.getLimit();
-
-        Results results = null;
-        int queryCount = 0;
-
-        boolean satisfied = false;
+        final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = new CollectionResultsLoaderFactoryImpl( managerCache );
 
 
+        //execute the query and return our next result
+        final QueryExecutor executor = new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, indexScope, types, query );
 
-        while ( !satisfied && queryCount++ < maxQueries ) {
-
-            CandidateResults crs = ei.search( indexScope, types, query );
-
-            if ( results == null ) {
-                logger.debug( "Calling build results 1" );
-                results = buildResults( indexScope, query, crs, collName );
-            }
-            else {
-                logger.debug( "Calling build results 2" );
-                Results newResults = buildResults(indexScope,  query, crs, collName );
-                results.merge( newResults );
-            }
-
-            if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
-                satisfied = true;
-            }
-
-            /**
-             * In an edge case where we delete stale entities, we could potentially get more results than expected.  This will only occur once during the repair phase.
-             * We need to ensure that we short circuit if we overflow the requested limit during a repair.
-             */
-            else if ( results.size() >= originalLimit ) { // got what we need
-                satisfied = true;
-            }
-            else if ( crs.hasCursor() ) {
-                satisfied = false;
-
-                // need to query for more
-                // ask for just what we need to satisfy, don't want to exceed limit
-                query.setCursor( results.getCursor() );
-                query.setLimit( originalLimit - results.size() );
-
-                logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
-                        originalLimit, query.getLimit(), queryCount
-                } );
-            }
-        }
-
-        return results;
+        return executor.next();
     }
 
 
@@ -1350,6 +1291,7 @@ public class CpRelationManager implements RelationManager {
     public Results getConnectedEntities(
             String connectionType, String connectedEntityType, Level level ) throws Exception {
 
+        //until this is refactored properly, we will delegate to a search by query
         Results raw = null;
 
         Preconditions.checkNotNull( connectionType, "connectionType cannot be null" );
@@ -1357,50 +1299,11 @@ public class CpRelationManager implements RelationManager {
         Query query = new Query();
         query.setConnectionType( connectionType );
         query.setEntityType( connectedEntityType );
+        query.setResultsLevel( level );
 
-        if ( connectionType == null ) {
-            raw = searchConnectedEntities( query );
-        }
-        else {
-
-            headEntity = em.validate( headEntity );
-
-
-            IndexScope indexScope = new IndexScopeImpl(
-                    cpHeadEntity.getId(), CpNamingUtils.getConnectionScopeName( connectionType ) );
-
-            final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType );
-
-
-            final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
-
-
-            logger.debug("Searching connected entities from scope {}:{}",
-                indexScope.getOwner().toString(),
-                indexScope.getName());
-
-            query = adjustQuery( query );
-            CandidateResults crs = ei.search( indexScope, searchTypes,  query );
-
-            raw = buildResults( indexScope, query, crs, query.getConnectionType() );
-        }
-
-        if ( Level.ALL_PROPERTIES.equals( level ) ) {
-            List<Entity> entities = new ArrayList<Entity>();
-            for ( EntityRef ref : raw.getEntities() ) {
-                Entity entity = em.get( ref );
-                entities.add( entity );
-            }
-            return Results.fromEntities( entities );
-        }
+        return searchConnectedEntities( query );
 
-        List<ConnectionRef> crefs = new ArrayList<ConnectionRef>();
-        for ( Entity e : raw.getEntities() ) {
-            ConnectionRef cref = new ConnectionRefImpl( headEntity, connectionType, e );
-            crefs.add( cref );
-        }
 
-        return Results.fromConnections( crefs );
     }
 
 
@@ -1472,9 +1375,16 @@ public class CpRelationManager implements RelationManager {
                 } );
 
         query = adjustQuery( query );
-        CandidateResults crs = ei.search( indexScope, searchTypes, query );
 
-        return buildConnectionResults( indexScope, query, crs, connection );
+        final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = new ConnectionResultsLoaderFactoryImpl( managerCache,
+            headEntity, connection );
+
+        final QueryExecutor executor = new ElasticSearchQueryExecutor(resultsLoaderFactory, ei, applicationScope, indexScope, searchTypes, query);
+
+        return executor.next();
+//        CandidateResults crs = ei.search( indexScope, searchTypes, query );
+
+//        return buildConnectionResults( indexScope, query, crs, connection );
     }
 
 
@@ -1564,63 +1474,33 @@ public class CpRelationManager implements RelationManager {
         return entity;
     }
 
+//
+//    private Results buildConnectionResults( final IndexScope indexScope,
+//            final Query query, final CandidateResults crs, final String connectionType ) {
+//
+//        if ( query.getLevel().equals( Level.ALL_PROPERTIES ) ) {
+//            return buildResults( indexScope, query, crs, connectionType );
+//        }
+//
+//        final EntityRef sourceRef = new SimpleEntityRef( headEntity.getType(), headEntity.getUuid() );
+//
+//        List<ConnectionRef> refs = new ArrayList<ConnectionRef>( crs.size() );
+//
+//        for ( CandidateResult cr : crs ) {
+//
+//            SimpleEntityRef targetRef =
+//                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid() );
+//
+//            final ConnectionRef ref =
+//                    new ConnectionRefImpl( sourceRef, connectionType, targetRef );
+//
+//            refs.add( ref );
+//        }
+//
+//        return Results.fromConnections( refs );
+//    }
 
-    private Results buildConnectionResults( final IndexScope indexScope,
-            final Query query, final CandidateResults crs, final String connectionType ) {
-
-        if ( query.getLevel().equals( Level.ALL_PROPERTIES ) ) {
-            return buildResults( indexScope, query, crs, connectionType );
-        }
-
-        final EntityRef sourceRef = new SimpleEntityRef( headEntity.getType(), headEntity.getUuid() );
-
-        List<ConnectionRef> refs = new ArrayList<ConnectionRef>( crs.size() );
-
-        for ( CandidateResult cr : crs ) {
-
-            SimpleEntityRef targetRef =
-                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid() );
-
-            final ConnectionRef ref =
-                    new ConnectionRefImpl( sourceRef, connectionType, targetRef );
-
-            refs.add( ref );
-        }
-
-        return Results.fromConnections( refs );
-    }
-
-
-    /**
-     * Build results from a set of candidates, and discard those that represent stale indexes.
-     *
-     * @param query Query that was executed
-     * @param crs Candidates to be considered for results
-     * @param collName Name of collection or null if querying all types
-     */
-    private Results buildResults( final IndexScope indexScope, final Query query,
-            final CandidateResults crs, final String collName ) {
-
-        logger.debug( "buildResults() for {} from {} candidates", collName, crs.size() );
-
-        //get an instance of our results loader
-        final ResultsLoader resultsLoader = this.resultsLoaderFactory.getLoader(
-                applicationScope, indexScope, query.getResultsLevel() );
-
-        //load the results
-        final Results results = resultsLoader.loadResults( crs );
-
-        //signal for post processing
-        resultsLoader.postProcess();
-
-
-        results.setCursor( crs.getCursor() );
-        results.setQueryProcessor( new CpQueryProcessor( em, query, headEntity, collName ) );
-
-        logger.debug( "Returning results size {}", results.size() );
 
-        return results;
-    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java
new file mode 100644
index 0000000..b74f433
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class CollectionRefsVerifier extends VersionVerifier {
+
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
+        for ( Id id : ids ) {
+            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
+        }
+        return Results.fromRefList( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..b79700b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.query.Query;
+
+
+/**
+ * Factory for creating results
+ */
+public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
+
+    private final ManagerCache managerCache;
+
+
+    public CollectionResultsLoaderFactoryImpl( final ManagerCache managerCache ) {
+        this.managerCache = managerCache;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope scope, final Query.Level resultsLevel ) {
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new CollectionRefsVerifier();
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+//            verifier = new RefsVerifier();
+            verifier = new IdsVerifier();
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+        return new FilteringLoader( managerCache, verifier, applicationScope, scope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java
new file mode 100644
index 0000000..408edd3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.cassandra.ConnectedEntityRefImpl;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
+
+
+/**
+ * Verifier for creating connections
+ */
+public class ConnectionRefsVerifier extends VersionVerifier {
+
+
+    private final EntityRef ownerId;
+    private final String connectionType;
+
+
+    public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
+        this.ownerId = ownerId;
+        this.connectionType = connectionType;
+    }
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<ConnectionRef> refs = new ArrayList<>();
+        for ( Id id : ids ) {
+            refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid())  ));
+        }
+
+        return Results.fromConnections( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..ba8eb2c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.query.Query;
+
+
+/**
+ * Factory for creating results
+ */
+public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
+
+    private final ManagerCache managerCache;
+    private final EntityRef ownerId;
+    private final String connectionType;
+
+
+    public ConnectionResultsLoaderFactoryImpl( final ManagerCache managerCache, final EntityRef ownerId,
+                                               final String connectionType ) {
+        this.managerCache = managerCache;
+        this.ownerId = ownerId;
+        this.connectionType = connectionType;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope scope, final Query.Level resultsLevel ) {
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+            verifier = new ConnectionRefsVerifier( ownerId, connectionType );;
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+        return new FilteringLoader( managerCache, verifier, applicationScope, scope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
new file mode 100644
index 0000000..535af36
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+
+
+public class ElasticSearchQueryExecutor implements QueryExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
+
+    private final ResultsLoaderFactory resultsLoaderFactory;
+
+    private final ApplicationScope applicationScope;
+
+    private final EntityIndex entityIndex;
+
+    private final IndexScope indexScope;
+
+    private final SearchTypes types;
+
+    private final Query query;
+
+
+    private Results currentResults;
+
+    private boolean moreToLoad = true;
+
+
+    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final EntityIndex entityIndex,
+                                       final ApplicationScope applicationScope, final IndexScope indexScope,
+                                       final SearchTypes types, final Query query ) {
+        this.resultsLoaderFactory = resultsLoaderFactory;
+        this.applicationScope = applicationScope;
+        this.entityIndex = entityIndex;
+        this.indexScope = indexScope;
+        this.types = types;
+
+        //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
+        //once we start subscribing to streams.
+        this.query = new Query(query);
+    }
+
+
+    @Override
+    public Iterator<Results> iterator() {
+        return this;
+    }
+
+
+    private void loadNextPage() {
+        // Because of possible stale entities, which are filtered out by buildResults(),
+        // we loop until the we've got enough results to satisfy the query limit.
+
+        final int maxQueries = 10; // max re-queries to satisfy query limit
+
+        final int originalLimit = query.getLimit();
+
+        Results results = null;
+        int queryCount = 0;
+
+        boolean satisfied = false;
+
+
+        while ( !satisfied && queryCount++ < maxQueries ) {
+
+            CandidateResults crs = entityIndex.search( indexScope, types, query );
+
+            logger.debug( "Calling build results 1" );
+            results = buildResults( indexScope, query, crs );
+
+            if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
+                satisfied = true;
+            }
+
+            /**
+             * In an edge case where we delete stale entities, we could potentially get less results than expected.
+             * This will only occur once during the repair phase.
+             * We need to ensure that we short circuit before we overflow the requested limit during a repair.
+             */
+            else if ( results.size() > 0 ) { // got what we need
+                satisfied = true;
+            }
+            //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
+            //circuit to avoid over returning the result set
+            else if ( crs.hasCursor() ) {
+                satisfied = false;
+
+                // need to query for more
+                // ask for just what we need to satisfy, don't want to exceed limit
+                query.setCursor( results.getCursor() );
+                query.setLimit( originalLimit - results.size() );
+
+                logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+                    originalLimit, query.getLimit(), queryCount
+                } );
+            }
+        }
+
+        //now set our cursor if we have one for the next iteration
+        if ( results.hasCursor() ) {
+            query.setCursor( results.getCursor() );
+            moreToLoad = true;
+        }
+
+        else {
+            moreToLoad = false;
+        }
+
+
+        //set our current results and the flag
+        this.currentResults = results;
+    }
+
+
+    /**
+     * Build results from a set of candidates, and discard those that represent stale indexes.
+     *
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     */
+    private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs ) {
+
+        logger.debug( "buildResults()  from {} candidates", crs.size() );
+
+        //get an instance of our results loader
+        final ResultsLoader resultsLoader =
+            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() );
+
+        //load the results
+        final Results results = resultsLoader.loadResults( crs );
+
+        //signal for post processing
+        resultsLoader.postProcess();
+
+
+        results.setCursor( crs.getCursor() );
+
+        //ugly and tight coupling, but we don't have a choice until we finish some refactoring
+        results.setQueryExecutor( this );
+
+        logger.debug( "Returning results size {}", results.size() );
+
+        return results;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        //we've tried to load and it's empty and we have more to load, load the next page
+        if ( currentResults == null ) {
+            //there's nothing left to load, nothing to do
+            if ( !moreToLoad ) {
+                return false;
+            }
+
+            //load the page
+
+            loadNextPage();
+        }
+
+
+        //see if our current results are not null
+        return currentResults != null;
+    }
+
+
+    @Override
+    public Results next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more results present" );
+        }
+
+        final Results toReturn = currentResults;
+
+        currentResults = null;
+
+        return toReturn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
new file mode 100644
index 0000000..3afb77f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.Results;
+
+
+/**
+ * A strategy interface for executing queries.  Each item in the iterator is a single collection of results
+ * Each implementation should always return 1 element of Results, even if the results are empty.
+ *
+ * QueryExecutor.next() should always return a non-null Results object
+ */
+public interface QueryExecutor extends Iterable<Results>, Iterator<Results> {
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
deleted file mode 100644
index 096c271..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.results;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class RefsVerifier extends VersionVerifier {
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<EntityRef> refs = new ArrayList<EntityRef>();
-        for ( Id id : ids ) {
-            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
-        }
-        return Results.fromRefList( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
index 14fbf88..3fbfff9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -37,5 +37,6 @@ public interface ResultsLoaderFactory {
      * @param indexScope The index scope used in the search
      * @param
      */
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope indexScope, final Query.Level resultsLevel );
+    ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope indexScope,
+                             final Query.Level resultsLevel );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
deleted file mode 100644
index 892b736..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.results;
-
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.query.Query;
-
-import com.google.inject.Inject;
-
-
-/**
- * Factory for creating results
- */
-public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final ManagerCache managerCache;
-
-
-    @Inject
-    public ResultsLoaderFactoryImpl( final ManagerCache managerCache ) {
-        this.managerCache = managerCache;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope scope, final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new RefsVerifier();
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new RefsVerifier();
-        }
-        else {
-            verifier = new EntityVerifier(Query.MAX_LIMIT);
-        }
-
-        return new FilteringLoader( managerCache, verifier, applicationScope, scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
index 5b64d0b..52e235c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
@@ -24,7 +24,7 @@ import org.apache.usergrid.persistence.index.query.Query.Level;
 
 
 /**
- * For each in a set of source refs executes a sub-query and provides a unified iterator over 
+ * For each in a set of source refs executes a sub-query and provides a unified iterator over
  * the union of all results. Honors page sizes for the Query to ensure memory isn't blown out.
  */
 public class MultiQueryIterator implements ResultsIterator {
@@ -36,10 +36,6 @@ public class MultiQueryIterator implements ResultsIterator {
     private Iterator currentIterator;
 
 
-    public MultiQueryIterator( Results results, Query query ) {
-        this( results.getQueryProcessor().getEntityManager(), 
-                new PagingResultsIterator( results, Level.IDS ), query );
-    }
 
 
     public MultiQueryIterator( EntityManager entityManager, Iterator<EntityRef> source, Query query ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 388b29e..f179000 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.usergrid.corepersistence.results.QueryExecutor;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -81,9 +82,7 @@ public class Results implements Iterable<Entity> {
     Query query;
     Object data;
     String dataName;
-
-    private QueryProcessor queryProcessor;
-    private SearchVisitor searchVisitor;
+    private QueryExecutor queryExecutor;
 
 
     public Results() {
@@ -1268,31 +1267,18 @@ public class Results implements Iterable<Entity> {
     }
 
 
-    protected QueryProcessor getQueryProcessor() {
-        return queryProcessor;
-    }
-
-
-    public void setQueryProcessor( QueryProcessor queryProcessor ) {
-        this.queryProcessor = queryProcessor;
-    }
-
-
-    public void setSearchVisitor( SearchVisitor searchVisitor ) {
-        this.searchVisitor = searchVisitor;
+    public void setQueryExecutor(final QueryExecutor queryExecutor){
+        this.queryExecutor = queryExecutor;
     }
 
 
     /** uses cursor to get next batch of Results (returns null if no cursor) */
     public Results getNextPageResults() throws Exception {
-        if ( !hasCursor() ) {
+        if ( queryExecutor == null || !queryExecutor.hasNext() ) {
             return null;
         }
 
-        Query q = new Query( query );
-        q.setCursor( getCursor() );
-        queryProcessor.setQuery( q );
 
-        return queryProcessor.getResults( searchVisitor );
+        return queryExecutor.next();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
index 874ff88..94569d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
@@ -292,7 +292,7 @@ public class QueryProcessorImpl implements QueryProcessor {
             }
         }
         if (logger.isDebugEnabled()) {
-        	logger.debug("Getting result for query: [{}],  returning entityIds size: {}", 
+        	logger.debug("Getting result for query: [{}],  returning entityIds size: {}",
                     getQuery(), entityIds.size());
         }
 
@@ -307,8 +307,8 @@ public class QueryProcessorImpl implements QueryProcessor {
         results.setCursor( resultsCursor.asString() );
 
         results.setQuery( query );
-        results.setQueryProcessor( this );
-        results.setSearchVisitor( visitor );
+//        results.setQueryProcessor( this );
+//        results.setSearchVisitor( visitor );
 
         return results;
     }
@@ -621,12 +621,12 @@ public class QueryProcessorImpl implements QueryProcessor {
 
         @Override
         public QueryBuilder getQueryBuilder() {
-            throw new UnsupportedOperationException("Not supported by this vistor implementation."); 
+            throw new UnsupportedOperationException("Not supported by this vistor implementation.");
         }
 
         @Override
         public FilterBuilder getFilterBuilder() {
-            throw new UnsupportedOperationException("Not supported by this vistor implementation."); 
+            throw new UnsupportedOperationException("Not supported by this vistor implementation.");
         }
     }
 
@@ -724,4 +724,4 @@ public class QueryProcessorImpl implements QueryProcessor {
     public EntityManager getEntityManager() {
         return em;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
index c56c07f..6ab672a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
@@ -27,8 +27,6 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-import org.apache.commons.lang3.RandomStringUtils;
-
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -181,15 +179,45 @@ public class PathQueryIT extends AbstractCoreIT {
         deviceQuery.addFilter( "index >= 4" );
         int expectedDeviceQuerySize = 3;
 
-        PathQuery groupsPQ = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
-        PathQuery usersPQ = groupsPQ.chain( userQuery );
-        PathQuery<Entity> devicesPQ = usersPQ.chain( deviceQuery );
 
-        HashSet set = new HashSet( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize );
-        Iterator<Entity> i = devicesPQ.iterator( em );
+        final PathQuery groupsPQ = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
+
+
+        //test 1 level deep
+        HashSet groupSet = new HashSet( expectedGroupQuerySize );
+        Iterator<Entity> groupsIterator = groupsPQ.iterator( em );
+
+        while ( groupsIterator.hasNext() ) {
+            groupSet.add( groupsIterator.next() );
+        }
+
+        assertEquals( expectedGroupQuerySize, groupSet.size() );
+
+        //test 2 levels
+
+        final PathQuery groupsPQ1 = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
+        PathQuery usersPQ1 = groupsPQ1.chain( userQuery );
+        final Iterator<Entity> userIterator  = usersPQ1.iterator( em );
+
+        final HashSet userSet = new HashSet( expectedGroupQuerySize * expectedUserQuerySize );
+
+        while ( userIterator.hasNext() ) {
+            userSet.add( userIterator.next() );
+        }
+
+        assertEquals( expectedGroupQuerySize * expectedUserQuerySize, userSet.size() );
+
+
+// ORIGINAL TEST, restore
+        PathQuery groupsPQ2 = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
+        PathQuery usersPQ2 = groupsPQ2.chain( userQuery );
+        PathQuery<Entity> devicesPQ2 = usersPQ2.chain( deviceQuery );
+
+        final HashSet deviceSet = new HashSet( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize );
+        Iterator<Entity> i = devicesPQ2.iterator( em );
         while ( i.hasNext() ) {
-            set.add( i.next() );
+            deviceSet.add( i.next() );
         }
-        assertEquals( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize, set.size() );
+        assertEquals( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize, deviceSet.size() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties
index 2b156b3..0ba16ea 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -44,6 +44,8 @@ log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=D
 #log4j.logger.org.apache.usergrid.persistence=INFO
 
 log4j.logger.org.apache.usergrid.corepersistence.migration=WARN
+
+log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO
 #log4j.logger.org.apache.usergrid.corepersistence=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpEntityManagerFactory=DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 69e0422..e129209 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -963,16 +963,19 @@ public abstract class GraphManagerIT {
         Id targetId2 = new SimpleId( "target2" );
 
 
-        Edge edge1 = createEdge( sourceId, "test", targetId1, System.currentTimeMillis() );
+        final long edge1Time = System.currentTimeMillis();
+        final long edge2Time = edge1Time+1;
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time );
 
         gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
 
-        Edge edge2 = createEdge( sourceId, "test", targetId2, System.currentTimeMillis() );
+        Edge edge2 = createEdge( sourceId, "test", targetId2, edge2Time );
 
         gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
 
 
-        final long maxVersion = System.currentTimeMillis();
+        final long maxVersion = edge2Time+1;
 
 
         assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) > 0 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 6be8234..c7be79d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -24,8 +24,6 @@ import org.safehaus.guicyfig.FigSingleton;
 import org.safehaus.guicyfig.GuicyFig;
 import org.safehaus.guicyfig.Key;
 
-import org.apache.usergrid.persistence.index.guice.QueueProvider;
-
 
 @FigSingleton
 public interface IndexFig extends GuicyFig {
@@ -93,6 +91,13 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
 
+
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout";
+
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -115,7 +120,7 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_ALIAS_POSTFIX )
     String getAliasPostfix();
 
-    @Default( "1" ) // TODO: does this timeout get extended on each query?
+    @Default( "5" ) // TODO: does this timeout get extended on each query?
     @Key( QUERY_CURSOR_TIMEOUT_MINUTES )
     int getQueryCursorTimeout();
 
@@ -193,7 +198,7 @@ public interface IndexFig extends GuicyFig {
     @Key( INDEX_QUEUE_READ_TIMEOUT )
     int getIndexQueueTimeout();
 
-    @Default("2")
+    @Default( "2" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 
@@ -201,4 +206,7 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_QUEUE_IMPL )
     String getQueueImplementation();
 
+    @Default( "1000" )
+    @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT )
+    long getQueueOfferTimeout();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index 998c086..b6eaf89 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -36,18 +36,26 @@ import com.google.inject.Singleton;
 @Singleton
 public class BufferQueueInMemoryImpl implements BufferQueue {
 
+
+    private final IndexFig fig;
     private final ArrayBlockingQueue<IndexOperationMessage> messages;
 
 
     @Inject
     public BufferQueueInMemoryImpl( final IndexFig fig ) {
+        this.fig = fig;
         messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
     }
 
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
-        messages.offer( operation );
+        try {
+            messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS );
+        }
+        catch ( InterruptedException e ) {
+            throw new RuntimeException("Unable to offer message to queue", e);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index b31cf39..7e64de3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -19,18 +19,12 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBufferConsumer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -39,6 +33,19 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
@@ -47,11 +54,6 @@ import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 /**
  * Consumer for IndexOperationMessages
@@ -189,7 +191,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         }
                         while ( true );
                     }
-                } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+                } ).doOnNext( new Action1<List<IndexOperationMessage>>() {
                 @Override
                 public void call( List<IndexOperationMessage> containerList ) {
                     if ( containerList.size() == 0 ) {
@@ -203,7 +205,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     execute( containerList );
 
                     time.stop();
-
                 }
             } )
                 //ack after we process
@@ -214,15 +215,29 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         //release  so we know we've done processing
                         inFlight.addAndGet( -1 * indexOperationMessages.size() );
                     }
-                } ).doOnError( new Action1<Throwable>() {
+                } )
+                //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die
+                .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() {
                     @Override
-                    public void call( final Throwable throwable ) {
+                    public List<IndexOperationMessage> call( final Throwable throwable ) {
+                        final long sleepTime = config.getFailureRetryTime();
+
+                        log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, throwable );
+
+                        try {
+                            Thread.sleep( sleepTime );
+                        }
+                        catch ( InterruptedException ie ) {
+                            //swallow
+                        }
 
-                        log.error( "An exception occurred when trying to deque and write to elasticsearch.  Ignoring",
-                            throwable );
                         indexErrorCounter.inc();
+
+                        return Collections.EMPTY_LIST;
                     }
-                } );
+                } )
+
+                .subscribeOn( Schedulers.newThread() );
 
             //start in the background
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index da68772..9a7a867 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@ -113,33 +113,41 @@ public class Query {
     }
 
 
+    /**
+     * Creates a deep copy of a query from another query
+     * @param q
+     */
     public Query( Query q ) {
-        if ( q != null ) {
-            type = q.type;
-            sortPredicates = q.sortPredicates != null
-                    ? new ArrayList<SortPredicate>( q.sortPredicates ) : null;
-            startResult = q.startResult;
-            cursor = q.cursor;
-            limit = q.limit;
-            selectAssignments = q.selectAssignments != null
-                    ? new LinkedHashMap<String, String>( q.selectAssignments ) : null;
-            mergeSelectResults = q.mergeSelectResults;
-            //level = q.level;
-            connectionType = q.connectionType;
-            permissions = q.permissions != null ? new ArrayList<String>( q.permissions ) : null;
-            reversed = q.reversed;
-            reversedSet = q.reversedSet;
-            startTime = q.startTime;
-            finishTime = q.finishTime;
-            resolution = q.resolution;
-            pad = q.pad;
-            rootOperand = q.rootOperand;
-            identifiers = q.identifiers != null
-                    ? new ArrayList<Identifier>( q.identifiers ) : null;
-            counterFilters = q.counterFilters != null
-                    ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
-            collection = q.collection;
+        if ( q == null ) {
+            return;
         }
+
+        type = q.type;
+        sortPredicates = q.sortPredicates != null
+                ? new ArrayList<>( q.sortPredicates ) : null;
+        startResult = q.startResult;
+        cursor = q.cursor;
+        limit = q.limit;
+        selectAssignments = q.selectAssignments != null
+                ? new LinkedHashMap<>( q.selectAssignments ) : null;
+        mergeSelectResults = q.mergeSelectResults;
+        //level = q.level;
+        connectionType = q.connectionType;
+        permissions = q.permissions != null ? new ArrayList<>( q.permissions ) : null;
+        reversed = q.reversed;
+        reversedSet = q.reversedSet;
+        startTime = q.startTime;
+        finishTime = q.finishTime;
+        resolution = q.resolution;
+        pad = q.pad;
+        rootOperand = q.rootOperand;
+        identifiers = q.identifiers != null
+                ? new ArrayList<>( q.identifiers ) : null;
+        counterFilters = q.counterFilters != null
+                ? new ArrayList<>( q.counterFilters ) : null;
+        collection = q.collection;
+        level = q.level;
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
index 6c7b480..9c768bf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.lang.math.NumberUtils;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index ee3f9cd..aea9d49 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -121,7 +121,7 @@
       <!-- only use half the cores on the machine for testing -->
       <usergrid.it.parallel>methods</usergrid.it.parallel>
       <usergrid.it.reuseForks>true</usergrid.it.reuseForks>
-      <usergrid.it.forkCount>4</usergrid.it.forkCount>
+      <usergrid.it.forkCount>1</usergrid.it.forkCount>
       <usergrid.it.threads>8</usergrid.it.threads>
 
       <metrics.version>3.0.0</metrics.version>