You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/04 19:27:52 UTC

[01/11] incubator-usergrid git commit: Updated mapping to fix missing doc_values and disable norms since we use external sorting

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev d18b8eea3 -> 9b939d15b


Updated mapping to fix missing doc_values and disable norms  since we use external sorting


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d4847a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d4847a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d4847a8

Branch: refs/heads/two-dot-o-dev
Commit: 6d4847a817aefa65075970a54af85faf159f9176
Parents: fd6305c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 10:16:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 10:16:10 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/persistence/index/usergrid-mappings.json | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d4847a8/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index f12b66b..bee84a2 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -73,6 +73,10 @@
                     },
                     "string": {
                         "type": "string",
+                        "doc_values": true,
+                        "norms": {
+                            "enabled": false
+                        },
                         "fields": {
                             "exact": {
                                 "type": "string",


[07/11] incubator-usergrid git commit: Fixes graph cursor resume state.

Posted by to...@apache.org.
Fixes graph cursor resume state.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/118711ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/118711ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/118711ae

Branch: refs/heads/two-dot-o-dev
Commit: 118711aee83875f72a0b058b784218b211748d4e
Parents: 413f023
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:59:00 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:59:00 2015 -0600

----------------------------------------------------------------------
 .../read/graph/AbstractReadGraphFilter.java     | 52 ++++++++++++++++++--
 1 file changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/118711ae/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 503fcf9..303bc5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -62,6 +63,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
         final String edgeName = getEdgeTypeName();
+        final EdgeState edgeCursorState = new EdgeState();
 
 
         //return all ids that are emitted from this edge
@@ -80,15 +82,30 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
             return graphManager.loadEdgesFromSource( search )
-                //set our cursor every edge we traverse
+                //set the edge state for cursors
+                .doOnNext( edge -> edgeCursorState.update( edge ) )
 
-                    //map our id from the target edge
-                .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
+                    //map our id from the target edge  and set our cursor every edge we traverse
+                .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
+                    previousFilterValue.getPath() ) );
         } );
     }
 
 
     @Override
+    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+                                                   final Optional<EdgePath> parent ) {
+
+        //if it's our first pass, there's no cursor to generate
+        if(cursorValue == null){
+            return new FilterResult<>( emit, parent );
+        }
+
+        return super.createFilterResult( emit, cursorValue, parent );
+    }
+
+
+    @Override
     protected CursorSerializer<Edge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
@@ -98,4 +115,33 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
      * Get the edge type name we should use when traversing
      */
     protected abstract String getEdgeTypeName();
+
+
+    /**
+     * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
+     * always try to seek to the same position as we ended.  Since we don't deal with a persistent read result, if we
+     * seek to a value = to our last, we may skip data.
+     */
+    private final class EdgeState {
+
+        private Edge cursorEdge = null;
+        private Edge currentEdge = null;
+
+
+        /**
+         * Update the pointers
+         */
+        private void update( final Edge newEdge ) {
+            cursorEdge = currentEdge;
+            currentEdge = newEdge;
+        }
+
+
+        /**
+         * Get the edge to use in cursors for resume
+         */
+        private Edge getCursorEdge() {
+            return cursorEdge;
+        }
+    }
 }


[11/11] incubator-usergrid git commit: Merge branch 'USERGRID-587' into two-dot-o-dev

Posted by to...@apache.org.
Merge branch 'USERGRID-587' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b939d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b939d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b939d15

Branch: refs/heads/two-dot-o-dev
Commit: 9b939d15ba398c1aa759f17d647bd85ee232ad9e
Parents: 4107ef3 5bb7798
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:46 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   5 +-
 .../corepersistence/pipeline/Pipeline.java      |  12 +-
 .../pipeline/PipelineContext.java               |  16 +-
 .../pipeline/PipelineOperation.java             |  39 ++++
 .../pipeline/PipelineResult.java                |  57 -----
 .../pipeline/cursor/ResponseCursor.java         |  81 ++++---
 .../pipeline/read/AbstractFilter.java           |  45 ++++
 .../pipeline/read/AbstractPathFilter.java       | 109 +++++++++
 .../read/AbstractPipelineOperation.java         |  44 ----
 .../pipeline/read/AbstractSeekingFilter.java    | 102 --------
 .../pipeline/read/CandidateResultsFilter.java   |  31 ---
 .../pipeline/read/Collector.java                |  13 +-
 .../pipeline/read/CollectorFactory.java         |  12 +-
 .../corepersistence/pipeline/read/EdgePath.java |  79 +++++++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |  40 +++-
 .../pipeline/read/FilterResult.java             |  56 +++++
 .../pipeline/read/PipelineOperation.java        |  38 ---
 .../pipeline/read/ReadPipelineBuilder.java      |   5 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  80 ++++---
 .../pipeline/read/ResultsPage.java              |  26 ++-
 .../read/collect/AbstractCollector.java         |  46 ++++
 .../pipeline/read/collect/EntityFilter.java     |  68 ++++++
 .../read/collect/IdCursorSerializer.java        |  41 ++++
 .../read/collect/ResultsPageCollector.java      |  80 +++++++
 .../AbstractElasticSearchFilter.java            |  45 ++--
 .../pipeline/read/elasticsearch/Candidate.java  |  55 +++++
 .../elasticsearch/CandidateEntityFilter.java    | 234 +++++++++++++++++++
 .../read/elasticsearch/CandidateIdFilter.java   | 201 ++++++++++++++++
 .../CandidateResultsEntityResultsCollector.java | 217 -----------------
 .../CandidateResultsIdVerifyFilter.java         | 193 ---------------
 .../impl/CollectionRefsVerifier.java            |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  65 ------
 .../impl/ConnectionRefsVerifier.java            |  59 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  73 ------
 .../impl/ElasticSearchQueryExecutor.java        | 224 ------------------
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
 .../elasticsearch/impl/FilteringLoader.java     | 219 -----------------
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ----
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ----
 .../impl/ResultsLoaderFactory.java              |  41 ----
 .../elasticsearch/impl/ResultsVerifier.java     |  52 -----
 .../elasticsearch/impl/VersionVerifier.java     |  85 -------
 .../pipeline/read/entity/EntityIdFilter.java    |  53 -----
 .../read/entity/EntityLoadCollector.java        |  94 --------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  12 +-
 .../read/graph/AbstractReadGraphFilter.java     |  65 +++++-
 .../pipeline/read/graph/EntityIdFilter.java     |  54 +++++
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ++++++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |  20 +-
 .../results/ObservableQueryExecutor.java        |  24 +-
 .../apache/usergrid/persistence/Results.java    |   2 +-
 .../pipeline/cursor/CursorTest.java             |  20 +-
 .../persistence/index/CandidateResults.java     |  11 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   7 +-
 .../persistence/index/usergrid-mappings.json    |   1 -
 56 files changed, 1575 insertions(+), 2100 deletions(-)
----------------------------------------------------------------------



[08/11] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587

Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a673c81b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a673c81b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a673c81b

Branch: refs/heads/two-dot-o-dev
Commit: a673c81bf7c51ce48f2b8d052fcba990931e1977
Parents: 118711a d18b8ee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:59:09 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:59:09 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EntityIndexTest.java | 121 ++++++++++---------
 1 file changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------



[03/11] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
new file mode 100644
index 0000000..56e1c1c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions
+ * Input is a batch of candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
+    implements Filter<Candidate, Id> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                              final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+
+    @Override
+      public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap(
+            candidateResults -> {
+                //flatten toa list of ids to load
+                final Observable<List<Id>> candidateIds =
+                    Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() )
+                              .toList();
+
+                //load the ids
+                final Observable<VersionSet> versionSetObservable =
+                    candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+                //now we have a collection, validate our canidate set is correct.
+
+                return versionSetObservable.map(
+                    entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
+                                           .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                        entityCollector -> Observable.from( entityCollector.collectResults() ) );
+        } );
+
+        return searchIdSetObservable;
+    }
+
+
+
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private static final class EntityCollector {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+        private List<FilterResult<Id>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final VersionSet versionSet;
+
+
+        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+                                final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.versionSet = versionSet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( versionSet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Id>> collectResults() {
+            return results;
+        }
+
+
+        /**
+         * Validate each candidate results vs the data loaded from cass
+         * @param filterCandidate
+         */
+        private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+            final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+            final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+            final UUID candidateVersion = candidateResult.getVersion();
+
+            final UUID entityVersion = logEntry.getVersion();
+
+            final Id entityId = logEntry.getEntityId();
+
+            //entity is newer than ES version
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+            }
+
+            //they're the same add it
+
+            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath()  );
+
+            results.add( result );
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
deleted file mode 100644
index 465ff22..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResults object and return them as results
- */
-public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage>
-    implements Collector<CandidateResults, ResultsPage> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                                   final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> call( final Observable<CandidateResults> candidateResultsObservable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<ResultsPage> searchIdSetObservable = candidateResultsObservable.flatMap( candidateResults -> {
-            //flatten toa list of ids to load
-            final Observable<List<Id>> candidateIds =
-                Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
-            //load the ids
-            final Observable<EntitySet> entitySetObservable =
-                candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
-            //now we have a collection, validate our canidate set is correct.
-
-            return entitySetObservable
-                .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                .doOnNext( entityCollector -> entityCollector.merge() )
-                .map( entityCollector -> entityCollector.getResults() );
-        } );
-
-        //if we filter all our results, we want to continue to try the next page
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<Entity> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final CandidateResults candidateResults;
-        private final EntitySet entitySet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet,
-                                final CandidateResults candidateResults ) {
-            this.batch = batch;
-            this.entitySet = entitySet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( entitySet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final CandidateResult candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public ResultsPage getResults() {
-            return new ResultsPage( results );
-        }
-
-
-        public EntityIndexBatch getBatch() {
-            return batch;
-        }
-
-
-        private void validate( final CandidateResult candidateResult ) {
-
-            final Id candidateId = candidateResult.getId();
-            final UUID candidateVersion = candidateResult.getVersion();
-
-
-            final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
-            //doesn't exist warn and drop
-            if ( entity == null ) {
-                logger.warn(
-                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
-                        + "  Ignoring since this could be a region sync issue",
-                    candidateId, candidateVersion );
-
-
-                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-
-            }
-
-
-            final UUID entityVersion = entity.getVersion();
-
-
-            //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) {
-
-                final Id entityId = entity.getId();
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                final Id entityId = entity.getId();
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
-                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-            }
-
-            //they're the same add it
-
-
-            results.add( entity.getEntity().get() );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
deleted file mode 100644
index bb9ab76..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions
- * Input is a batch of candidate results, output is a stream of validated Ids
- */
-public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id>
-    implements PipelineOperation<CandidateResults, Id> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                           final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-
-    @Override
-    public Observable<Id> call( final Observable<CandidateResults> observable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> {
-            //flatten toa list of ids to load
-            final Observable<List<Id>> candidateIds =
-                Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
-            //load the ids
-            final Observable<VersionSet> versionSetObservable =
-                candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
-            //now we have a collection, validate our canidate set is correct.
-
-            return versionSetObservable
-                .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                .doOnNext( entityCollector -> entityCollector.merge() )
-                .flatMap( entityCollector -> Observable.from(  entityCollector.collectResults() ) );
-        } );
-
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<Id> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final CandidateResults candidateResults;
-        private final VersionSet versionSet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
-                                final CandidateResults candidateResults ) {
-            this.batch = batch;
-            this.versionSet = versionSet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( versionSet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final CandidateResult candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public List<Id> collectResults() {
-            return results;
-        }
-
-
-        /**
-         * Validate each candidate results vs the data loaded from cass
-         * @param candidateResult
-         */
-        private void validate( final CandidateResult candidateResult ) {
-
-            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
-            final UUID candidateVersion = candidateResult.getVersion();
-
-            final UUID entityVersion = logEntry.getVersion();
-
-            final Id entityId = logEntry.getEntityId();
-
-            //entity is newer than ES version
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-            }
-
-            //they're the same add it
-
-            results.add( entityId );
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
deleted file mode 100644
index cc96633..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class CollectionRefsVerifier extends VersionVerifier {
-
-
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
-        for ( Id id : ids ) {
-            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
-        }
-        return Results.fromRefList( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 94c91d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
-
-
-    public CollectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
-        final ApplicationEntityIndex applicationEntityIndex ) {
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
-                                    final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new CollectionRefsVerifier();
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new IdsVerifier();
-        }
-        else {
-            verifier = new EntityVerifier( Query.MAX_LIMIT );
-        }
-
-        return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope,
-            scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
deleted file mode 100644
index 6b7bdde..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
-
-
-/**
- * Verifier for creating connections
- */
-public class ConnectionRefsVerifier extends VersionVerifier {
-
-
-    private final EntityRef ownerId;
-    private final String connectionType;
-
-
-    public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
-        this.ownerId = ownerId;
-        this.connectionType = connectionType;
-    }
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<ConnectionRef> refs = new ArrayList<>();
-        for ( Id id : ids ) {
-            refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid())  ));
-        }
-
-        return Results.fromConnections( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 55b95a9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
-    private final EntityRef ownerId;
-    private final String connectionType;
-
-
-    public ConnectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
-                                               final ApplicationEntityIndex applicationEntityIndex, final EntityRef ownerId,
-                                               final String connectionType ) {
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
-        this.ownerId = ownerId;
-        this.connectionType = connectionType;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
-                                    final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
-            ;
-        }
-        else {
-            verifier = new EntityVerifier( Query.MAX_LIMIT );
-        }
-
-        return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
deleted file mode 100644
index 6e170f8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.index.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> {
-
-    private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
-
-    private final ResultsLoaderFactory resultsLoaderFactory;
-
-    private final ApplicationScope applicationScope;
-
-    private final ApplicationEntityIndex entityIndex;
-
-    private final SearchEdge indexScope;
-
-    private final SearchTypes types;
-
-    private final String query;
-
-    private final Optional<Integer> setOffsetFromCursor;
-
-    private final int limit;
-
-    private int offset;
-
-
-    private Results currentResults;
-
-    private boolean moreToLoad = true;
-
-
-
-
-    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
-                                       final ApplicationScope applicationScope, final SearchEdge indexScope,
-                                       final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) {
-        this.resultsLoaderFactory = resultsLoaderFactory;
-        this.applicationScope = applicationScope;
-        this.entityIndex = entityIndex;
-        this.indexScope = indexScope;
-        this.types = types;
-        this.setOffsetFromCursor = setOffsetFromCursor;
-
-        //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
-        //once we start subscribing to streams.
-        this.query = query;
-        this.limit = limit;
-    }
-
-
-    @Override
-    public Iterator<Results> iterator() {
-        return this;
-    }
-
-
-    private void loadNextPage() {
-        // Because of possible stale entities, which are filtered out by buildResults(),
-        // we loop until the we've got enough results to satisfy the query limit.
-
-        final int maxQueries = 10; // max re-queries to satisfy query limit
-
-
-        Results results = null;
-        int queryCount = 0;
-
-
-        CandidateResults crs = null;
-
-        int newLimit = limit;
-
-        while ( queryCount++ < maxQueries ) {
-
-            crs = entityIndex.search( indexScope, types, query, newLimit , offset);
-
-
-            logger.debug( "Calling build results with crs {}", crs );
-            results = buildResults( indexScope, crs );
-
-            /**
-             * In an edge case where we delete stale entities, we could potentially get less results than expected.
-             * This will only occur once during the repair phase.
-             * We need to ensure that we short circuit before we overflow the requested limit during a repair.
-             */
-            if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more
-                break;
-            }
-
-
-            //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
-            //circuit to avoid over returning the result set
-
-
-            // need to query for more
-            // ask for just what we need to satisfy, don't want to exceed limit
-            newLimit =  newLimit - results.size();
-
-            logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
-                limit, newLimit, queryCount
-            } );
-        }
-
-        //now set our cursor if we have one for the next iteration
-        if ( results.hasCursor() ) {
-            moreToLoad = true;
-        }
-
-        else {
-            moreToLoad = false;
-        }
-
-//
-//        //set our select subjects into our query if provided
-//        if(crs != null){
-//            query.setSelectSubjects( crs.getGetFieldMappings() );
-//        }
-//
-
-        //set our current results and the flag
-        this.currentResults = results;
-    }
-
-
-
-    /**
-     * Build results from a set of candidates, and discard those that represent stale indexes.
-     *
-     * @param indexScope The index scope to execute the search on
-     * @param crs Candidates to be considered for results
-     */
-    private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) {
-
-        logger.debug( "buildResults()  from {} candidates", crs.size() );
-
-        //get an instance of our results loader
-        final ResultsLoader resultsLoader =
-            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES );
-
-        //load the results
-        final Results results = resultsLoader.loadResults(crs);
-
-        //signal for post processing
-        resultsLoader.postProcess();
-
-        //set offset into query
-
-        logger.debug( "Returning results size {}", results.size() );
-
-        return results;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-
-        //we've tried to load and it's empty and we have more to load, load the next page
-        if ( currentResults == null ) {
-            //there's nothing left to load, nothing to do
-            if ( !moreToLoad ) {
-                return false;
-            }
-
-            //load the page
-
-            loadNextPage();
-        }
-
-
-        //see if our current results are not null
-        return currentResults != null;
-    }
-
-
-    @Override
-    public Results next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more results present" );
-        }
-
-        final Results toReturn = currentResults;
-
-        currentResults = null;
-
-        return toReturn;
-    }
-
-    @Override
-    public void remove() {
-        throw new RuntimeException("Remove not implemented!!");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
deleted file mode 100644
index d73c731..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-
-
-/**
- * A loader that verifies versions are correct in cassandra and match elasticsearch
- */
-public class EntityVerifier implements ResultsVerifier {
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-
-    private EntitySet ids;
-
-    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
-
-
-    public EntityVerifier( final int maxSize ) {
-        this.entityMapping = new HashMap<>( maxSize );
-    }
-
-
-    @Override
-    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
-        ids = ecm.load( idsToLoad ).toBlocking().last();
-        logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size());
-    }
-
-
-    @Override
-    public boolean isValid( final CandidateResult candidateResult ) {
-        final Id entityId = candidateResult.getId();
-
-        final MvccEntity savedEntity = ids.getEntity( entityId );
-
-        //version wasn't found deindex
-        if ( savedEntity == null ) {
-            logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() );
-            return false;
-        }
-
-        final UUID candidateVersion = candidateResult.getVersion();
-        final UUID savedVersion = savedEntity.getVersion();
-
-        if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) {
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
-                    entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity
-            } );
-
-            return false;
-        }
-
-
-        final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity();
-
-        if ( !entity.isPresent() ) {
-            logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ",
-                    entityId.getUuid(), savedEntity.getEntity() );
-            return false;
-        }
-
-        entityMapping.put( entityId, entity.get() );
-
-        return true;
-    }
-
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-
-        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
-
-        for ( final Id id : ids ) {
-            final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
-
-            Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() );
-
-            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
-            entity.addProperties( entityMap );
-            ugEntities.add( entity );
-        }
-
-        return Results.fromEntities( ugEntities );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
deleted file mode 100644
index ade64a2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-
-public class FilteringLoader implements ResultsLoader {
-
-    private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ResultsVerifier resultsVerifier;
-    private final ApplicationScope applicationScope;
-    private final SearchEdge indexScope;
-    private final EntityIndexBatch indexBatch;
-
-
-    /**
-     * Create an instance of a filter loader
-     *
-     * @param entityCollectionManager The entityCollectionManagerFactory
-     * @param resultsVerifier The verifier to verify the candidate results
-     * @param applicationScope The application scope to perform the load
-     * @param indexScope The index scope used in the search
-     */
-    protected FilteringLoader( final  EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex,  final ResultsVerifier resultsVerifier,
-                               final ApplicationScope applicationScope, final SearchEdge indexScope ) {
-
-        this.entityCollectionManager = entityCollectionManager;
-        this.resultsVerifier = resultsVerifier;
-        this.applicationScope = applicationScope;
-        this.indexScope = indexScope;
-
-        indexBatch = applicationEntityIndex.createBatch();
-    }
-
-
-    @Override
-    public Results loadResults( final CandidateResults crs ) {
-
-
-        if ( crs.size() == 0 ) {
-            return new Results();
-        }
-
-
-        // For each entity, holds the index it appears in our candidates for keeping ordering correct
-        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
-
-        // Maps the entity ids to our candidates
-        final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
-
-
-        final Iterator<CandidateResult> iter = crs.iterator();
-
-
-        // TODO, in this case we're "optimizing" due to the limitations of collection scope.
-        // Perhaps  we should change the API to just be an application, then an "owner" scope?
-
-        // Go through the candidates and group them by scope for more efficient retrieval.
-        // Also remove duplicates before we even make a network call
-        for ( int i = 0; iter.hasNext(); i++ ) {
-
-            final CandidateResult currentCandidate = iter.next();
-
-            final Id entityId = currentCandidate.getId();
-
-            //check if we've seen this candidate by id
-            final CandidateResult previousMax = maxCandidateMapping.get( entityId );
-
-            //its not been seen, save it
-            if ( previousMax == null ) {
-                maxCandidateMapping.put( entityId, currentCandidate );
-                orderIndex.put( entityId, i );
-                continue;
-            }
-
-            //we have seen it, compare them
-
-            final UUID previousMaxVersion = previousMax.getVersion();
-
-            final UUID currentVersion = currentCandidate.getVersion();
-
-
-            final CandidateResult toRemove;
-            final CandidateResult toKeep;
-
-            //current is newer than previous.  Remove previous and keep current
-            if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
-                toRemove = previousMax;
-                toKeep = currentCandidate;
-            }
-            //previously seen value is newer than current.  Remove the current and keep the previously seen value
-            else {
-                toRemove = currentCandidate;
-                toKeep = previousMax;
-            }
-
-            //this is a newer version, we know we already have a stale entity, add it to be cleaned up
-
-
-            //de-index it
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
-                    entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion()
-                } );
-
-            //deindex this document, and remove the previous maxVersion
-            //we have to deindex this from our ownerId, since this is what gave us the reference
-            indexBatch.deindex( indexScope, toRemove );
-
-
-            //TODO, fire the entity repair cleanup task here instead of de-indexing
-
-            //replace the value with a more current version
-            maxCandidateMapping.put( entityId, toKeep );
-            orderIndex.put( entityId, i );
-        }
-
-
-        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify
-        // existence and correct versions
-
-        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
-
-
-        final Collection<Id> idsToLoad =
-            Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() {
-                @Nullable
-                @Override
-                public Id apply( @Nullable final CandidateResult input ) {
-                    //NOTE this is never null, we won't need to check
-                    return input.getId();
-                }
-            } );
-
-
-        //now using the scope, load the collection
-
-
-
-        //load the results into the loader for this scope for validation
-        resultsVerifier.loadResults( idsToLoad, entityCollectionManager );
-
-        //now let the loader validate each candidate.  For instance, the "max" in this candidate
-        //could still be a stale result, so it needs validated
-        for ( final Id requestedId : idsToLoad ) {
-
-            final CandidateResult cr = maxCandidateMapping.get( requestedId );
-
-            //ask the loader if this is valid, if not discard it and de-index it
-            if ( !resultsVerifier.isValid( cr ) ) {
-                indexBatch.deindex( indexScope, cr );
-                continue;
-            }
-
-            //if we get here we're good, we need to add this to our results
-            final int candidateIndex = orderIndex.get( requestedId );
-
-            sortedResults.put( candidateIndex, requestedId );
-        }
-
-
-        // NOTE DO NOT execute the batch here.
-        // It changes the results and we need consistent paging until we aggregate all results
-        return resultsVerifier.getResults( sortedResults.values() );
-    }
-
-
-    @Override
-    public void postProcess() {
-        this.indexBatch.execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
deleted file mode 100644
index 4a3bfcd..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class IdsVerifier extends VersionVerifier {
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-
-        final List<UUID> returnIds = new ArrayList<>( ids.size() );
-
-        for ( final Id id : ids ) {
-            returnIds.add( id.getUuid() );
-        }
-
-
-        return Results.fromIdList( returnIds );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
deleted file mode 100644
index c2a3e9a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.CandidateResults;
-
-
-/**
- * Interface for loading results
- */
-public interface ResultsLoader {
-
-    /**
-     * Using the candidate results, load our results.  Should filter stale results
-     * @param  crs The candidate result set
-     * @return Results.  Null safe, but may be empty
-     */
-    public Results loadResults( final CandidateResults crs);
-
-    /**
-     * Post process the load operation
-     */
-    public void postProcess();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
deleted file mode 100644
index 3ccca1b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public interface ResultsLoaderFactory {
-
-    /**
-     * Get the loader for results
-     * @param applicationScope The application scope used to load results
-     * @param indexScope The index scope used in the search
-     * @param
-     */
-    ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope,
-                             final Query.Level resultsLevel );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
deleted file mode 100644
index fe72ca2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface ResultsVerifier {
-
-    /**
-     * Load all the candidate ides for verification
-     * @param ids The Id's to load
-     * @param ecm The entity collection manager
-     */
-    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
-
-    /**
-     * Return true if the candidate result is a valid result that should be retained. If it should
-     * not it should also be removed from the list of possible return values in this loader
-     * @param candidateResult
-     */
-    public boolean isValid(CandidateResult candidateResult);
-
-
-    /**
-     * Load the result set with the given ids
-     * @return
-     */
-    public Results getResults(Collection<Id> ids);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
deleted file mode 100644
index c49fb28..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-
-
-/**
- * A loader that verifies versions are correct in Cassandra and match ElasticSearch
- */
-public abstract class VersionVerifier implements ResultsVerifier {
-
-    private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
-
-    private VersionSet ids;
-
-
-    @Override
-    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
-        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
-    }
-
-
-    @Override
-    public boolean isValid( final CandidateResult candidateResult ) {
-        final Id entityId = candidateResult.getId();
-
-        final MvccLogEntry version = ids.getMaxVersion( entityId );
-
-        //version wasn't found ,deindex
-        if ( version == null ) {
-            logger.warn( "Version for Entity {}:{} not found",
-                    entityId.getUuid(), entityId.getUuid() );
-
-            return false;
-        }
-
-        final UUID savedVersion = version.getVersion();
-
-        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
-            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
-                new Object[] {
-                    entityId.getUuid(),
-                    entityId.getType(),
-                    candidateResult.getVersion(),
-                    savedVersion
-            } );
-
-            return false;
-        }
-
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
deleted file mode 100644
index 6230147..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> {
-
-    private final Id entityId;
-
-
-    @Inject
-    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
-
-    @Override
-    public Observable<Id> call( final Observable<Id> idObservable ) {
-        return Observable.just( entityId );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
deleted file mode 100644
index dd6b9b8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage>
-    implements Collector<Id, ResultsPage> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
-    @Inject
-    public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> call( final Observable<Id> observable ) {
-
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
-        final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap(
-            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) );
-
-
-        final Observable<ResultsPage> resultsObservable = entitySetObservable
-
-            .flatMap( entitySet -> {
-
-                //get our entites and filter missing ones, then collect them into a results object
-                final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() );
-
-
-                //convert them to our old entity model, then filter abscent, meaning they weren't found
-                final Observable<List<Entity>> entitiesPageObservable =
-                    mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() )
-                                        .map( mvccEntity -> mvccEntity.getEntity().get() ).toList();
-
-                //convert them to a list, then map them into results
-                return entitiesPageObservable.map( entities -> new ResultsPage( entities ) );
-            } );
-
-
-        return resultsObservable;
-    }
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index e0f69cf..42b352b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,8 +20,9 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
@@ -39,7 +40,7 @@ import rx.Observable;
 /**
  * Filter should take and Id and a graph edge, and ensure the connection between the two exists
  */
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
     Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
@@ -55,12 +56,13 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
 
 
     @Override
-    public Observable<Id> call( final Observable<Id> idObservable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
 
-        return idObservable.flatMap( id -> {
+        return filterValueObservable.flatMap( filterValue -> {
             final String edgeTypeName = getEdgeName();
+            final Id id = filterValue.getValue();
 
             //create our search
             final SearchByEdge searchByEdge =
@@ -68,7 +70,7 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
                     Optional.absent() );
 
             //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
-            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() );
+            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 4dd34fc..503fcf9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -38,7 +39,7 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
 
@@ -52,7 +53,8 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
 
 
     @Override
-    public Observable<Id> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
 
         //get the graph manager
         final GraphManager graphManager =
@@ -63,10 +65,11 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return previousIds.flatMap( previousFilterValue -> {
 
             //set our our constant state
             final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = previousFilterValue.getValue();
 
 
             final SimpleSearchByEdgeType search =
@@ -78,9 +81,9 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
              */
             return graphManager.loadEdgesFromSource( search )
                 //set our cursor every edge we traverse
-                .doOnNext( edge -> setCursor( edge ) )
+
                     //map our id from the target edge
-                .map( edge -> edge.getTargetNode() );
+                .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
new file mode 100644
index 0000000..5a0e026
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
+
+    private final Id entityId;
+
+
+    @Inject
+    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+       return filterValueObservable.map( idFilterResult ->  new FilterResult( entityId, idFilterResult.getPath() ));
+
+    }
+}


[06/11] incubator-usergrid git commit: Fixes resume logic by loading then filtering first id

Posted by to...@apache.org.
Fixes resume logic by loading then filtering first id


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/413f023e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/413f023e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/413f023e

Branch: refs/heads/two-dot-o-dev
Commit: 413f023e243d8e398db6295b05a9c1f6c8c8feed
Parents: 294a7d9
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:11:01 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:11:01 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/pipeline/Pipeline.java      |  3 +-
 .../pipeline/read/FilterFactory.java            |  9 +++
 .../pipeline/read/ReadPipelineBuilderImpl.java  | 11 +++-
 .../pipeline/read/collect/EntityFilter.java     | 68 ++++++++++++++++++++
 .../read/collect/IdCursorSerializer.java        | 41 ++++++++++++
 .../persistence/index/usergrid-mappings.json    |  1 -
 6 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index df6a218..26cf346 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
@@ -77,7 +78,7 @@ public class Pipeline<R> {
     public Observable<R> execute(){
 
 
-        Observable traverseObservable = Observable.just( applicationScope.getApplication() );
+        Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() ));
 
         //build our traversal commands
         for ( PipelineOperation pipelineOperation : idPipelineOperationList ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index c465516..a2f1605 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
@@ -131,4 +133,11 @@ public interface FilterFactory {
      * @param entityId The entity id to emit
      */
     EntityIdFilter getEntityIdFilter( final Id entityId );
+
+
+    /**
+     * Create a new instance of our entity filter
+     * @return
+     */
+    EntityFilter entityFilter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index ffb9f7d..28446ad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -26,9 +26,9 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
-import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -211,9 +211,14 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
 
         //add our last filter that will generate entities
-        final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+        final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter();
 
-        filters.add( finalFilter );
+        filters.add( entityLoadFilter );
+
+        //add the filter that skips the first result on resume
+        final Filter<Entity, Entity>  cursorEntityFilter = filterFactory.entityFilter();
+
+        filters.add( cursorEntityFilter );
 
 
         //execute our collector

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
new file mode 100644
index 0000000..daf2e7f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor.  This will filter the first result, only if
+ * it matches the Id that was set
+ */
+public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+        //filter only the first id, then map into our path for our next pass
+
+
+        return filterResultObservable.skipWhile( filterResult -> {
+
+            final Optional<Id> startFromCursor = getSeekValue();
+
+            return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() );
+        } ).map( filterResult -> {
+
+
+            final Entity entity = filterResult.getValue();
+            final Id entityId = entity.getId();
+
+            return createFilterResult( entity, entityId, filterResult.getPath() );
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Id> getCursorSerializer() {
+        return IdCursorSerializer.INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
new file mode 100644
index 0000000..d96b9f2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * cursor serializer for Ids
+ */
+public class IdCursorSerializer extends AbstractCursorSerializer<Id> {
+
+
+    public static final IdCursorSerializer INSTANCE = new IdCursorSerializer();
+
+    @Override
+    protected Class<Id> getType() {
+        return Id.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index bee84a2..c22a4ec 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -73,7 +73,6 @@
                     },
                     "string": {
                         "type": "string",
-                        "doc_values": true,
                         "norms": {
                             "enabled": false
                         },


[10/11] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev

Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4107ef38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4107ef38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4107ef38

Branch: refs/heads/two-dot-o-dev
Commit: 4107ef38961dceecaf90c03edfed0d43dc67486b
Parents: 6d4847a d18b8ee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:38 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:38 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  34 ++---
 .../usergrid/persistence/EntityManagerIT.java   |   4 +-
 .../graph/impl/SimpleSearchByEdge.java          |   5 +-
 .../usergrid/persistence/index/IndexFig.java    |  11 +-
 .../persistence/index/IndexRefreshCommand.java  |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 136 +++++++++++--------
 .../persistence/index/impl/EntityIndexTest.java | 123 ++++++++---------
 8 files changed, 173 insertions(+), 144 deletions(-)
----------------------------------------------------------------------



[04/11] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

Posted by to...@apache.org.
Massive refactor.  Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd983d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd983d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd983d66

Branch: refs/heads/two-dot-o-dev
Commit: cd983d66260222985431a775454183c2ed2305ea
Parents: 6d4847a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:40:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:40:52 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   5 +-
 .../corepersistence/pipeline/Pipeline.java      |   9 +-
 .../pipeline/PipelineContext.java               |  16 +-
 .../pipeline/PipelineOperation.java             |  39 ++++
 .../pipeline/PipelineResult.java                |  57 -----
 .../pipeline/cursor/ResponseCursor.java         |  81 ++++---
 .../pipeline/read/AbstractFilter.java           |  45 ++++
 .../pipeline/read/AbstractPathFilter.java       | 109 +++++++++
 .../read/AbstractPipelineOperation.java         |  44 ----
 .../pipeline/read/AbstractSeekingFilter.java    | 102 --------
 .../pipeline/read/CandidateResultsFilter.java   |  31 ---
 .../pipeline/read/Collector.java                |  13 +-
 .../pipeline/read/CollectorFactory.java         |  12 +-
 .../corepersistence/pipeline/read/EdgePath.java |  79 +++++++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |  31 ++-
 .../pipeline/read/FilterResult.java             |  56 +++++
 .../pipeline/read/PipelineOperation.java        |  38 ---
 .../pipeline/read/ReadPipelineBuilder.java      |   5 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  75 +++---
 .../pipeline/read/ResultsPage.java              |  26 ++-
 .../read/collect/AbstractCollector.java         |  46 ++++
 .../read/collect/ResultsPageCollector.java      |  80 +++++++
 .../AbstractElasticSearchFilter.java            |  47 ++--
 .../pipeline/read/elasticsearch/Candidate.java  |  55 +++++
 .../elasticsearch/CandidateEntityFilter.java    | 234 +++++++++++++++++++
 .../read/elasticsearch/CandidateIdFilter.java   | 201 ++++++++++++++++
 .../CandidateResultsEntityResultsCollector.java | 217 -----------------
 .../CandidateResultsIdVerifyFilter.java         | 193 ---------------
 .../impl/CollectionRefsVerifier.java            |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  65 ------
 .../impl/ConnectionRefsVerifier.java            |  59 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  73 ------
 .../impl/ElasticSearchQueryExecutor.java        | 224 ------------------
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
 .../elasticsearch/impl/FilteringLoader.java     | 219 -----------------
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ----
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ----
 .../impl/ResultsLoaderFactory.java              |  41 ----
 .../elasticsearch/impl/ResultsVerifier.java     |  52 -----
 .../elasticsearch/impl/VersionVerifier.java     |  85 -------
 .../pipeline/read/entity/EntityIdFilter.java    |  53 -----
 .../read/entity/EntityLoadCollector.java        |  94 --------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  12 +-
 .../read/graph/AbstractReadGraphFilter.java     |  15 +-
 .../pipeline/read/graph/EntityIdFilter.java     |  54 +++++
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ++++++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |  20 +-
 .../results/ObservableQueryExecutor.java        |  24 +-
 .../pipeline/cursor/CursorTest.java             |  20 +-
 .../persistence/index/CandidateResults.java     |  11 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   7 +-
 52 files changed, 1402 insertions(+), 2096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3119934..2790ee1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,7 +32,6 @@ import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
@@ -648,7 +647,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }
@@ -917,7 +916,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index bc93b6c..df6a218 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
@@ -47,7 +46,6 @@ public class Pipeline<R> {
     private final List<PipelineOperation> idPipelineOperationList;
     private final Collector<?, R> collector;
     private final RequestCursor requestCursor;
-    private final ResponseCursor responseCursor;
 
     private final int limit;
 
@@ -69,7 +67,6 @@ public class Pipeline<R> {
         this.limit = limit;
 
         this.requestCursor = new RequestCursor( cursor );
-        this.responseCursor = new ResponseCursor();
     }
 
 
@@ -77,7 +74,7 @@ public class Pipeline<R> {
      * Execute the pipline construction, returning an observable of results
      * @return
      */
-    public Observable<PipelineResult<R>> execute(){
+    public Observable<R> execute(){
 
 
         Observable traverseObservable = Observable.just( applicationScope.getApplication() );
@@ -99,7 +96,7 @@ public class Pipeline<R> {
 
 
         //append the optional cursor into the response for the caller to use
-        return response.map( result -> new PipelineResult<>( result, responseCursor ) );
+        return response;
     }
 
 
@@ -111,7 +108,7 @@ public class Pipeline<R> {
     private void setState( final PipelineOperation pipelineOperation ) {
 
 
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor,
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor,
             limit, idCount );
 
         pipelineOperation.setContext( context );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 325f876..018abb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -38,16 +38,13 @@ public class PipelineContext {
     private final int id;
     private final ApplicationScope applicationScope;
     private final RequestCursor requestCursor;
-    private final ResponseCursor responseCursor;
     private final int limit;
 
 
-    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor,
-                            final ResponseCursor responseCursor, final int limit, final int id ) {
+    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
 
         this.applicationScope = applicationScope;
         this.requestCursor = requestCursor;
-        this.responseCursor = responseCursor;
         this.limit = limit;
         this.id = id;
     }
@@ -64,7 +61,7 @@ public class PipelineContext {
 
 
     /**
-     * Get our cursor value if present
+     * Get our cursor value if present from our pipline
      * @param serializer
      */
     public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) {
@@ -73,15 +70,6 @@ public class PipelineContext {
         return Optional.fromNullable( value );
     }
 
-
-    /**
-     * Set the cursor value into our resposne
-     */
-    public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) {
-        responseCursor.setCursor( id, value, serializer );
-    }
-
-
     /**
      * Get the limit for this execution
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
new file mode 100644
index 0000000..d2fa16c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+
+import rx.Observable;
+
+
+/**
+ * Interface for filtering commands.  All filters must take an observable of Id's as an input.  Output is then determined by subclasses.
+  * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
+  * pipeline
+ * @param <T> The input type of the filter value
+ * @param <R> The output type of the filter value
+ */
+public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+
+    void setContext(final PipelineContext pipelineContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
deleted file mode 100644
index fe8604e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Intermediate observable that will return results, as well as an optional cursor
- * @param <R>
- */
-public class PipelineResult<R> {
-
-
-    private final R result;
-
-    private final ResponseCursor responseCursor;
-
-
-    public PipelineResult( final R result, final ResponseCursor responseCursor ) {
-        this.result = result;
-        this.responseCursor = responseCursor;
-    }
-
-
-    /**
-     * If the user requests our cursor, return the cursor
-     * @return
-     */
-    public Optional<String> getCursor(){
-        return this.responseCursor.encodeAsString();
-    }
-
-    public R getResult(){
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index f1c8c24..dbd8b88 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,12 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.io.Serializable;
 import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
 
-import com.fasterxml.jackson.core.Base64Variant;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,71 +39,72 @@ public class ResponseCursor {
 
     private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper();
 
+
     /**
-     * We use a map b/c some indexes might be skipped
+     * The pointer to the first edge path.  Evaluation is lazily performed in the case the caller does not care about
+     * the cursor.
      */
-    private Map<Integer, CursorEntry<?>> cursors = new HashMap<>();
+    private final Optional<EdgePath> edgePath;
 
+    private Optional<String> encodedValue = null;
 
-    /**
-     * Set the possible cursor value into the index. DOES NOT parse the cursor.  This is intentional for performance
-     */
-    public <T extends Serializable> void setCursor( final int id, final T cursor,
-                                                    final CursorSerializer<T> serializer ) {
 
-        final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer );
-        cursors.put( id, newEntry );
-    }
+    public ResponseCursor( final Optional<EdgePath> edgePath ) {this.edgePath = edgePath;}
 
 
     /**
-     * now we're done, encode as a string
+     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using streams, we dont' want to take the
+     * time to calculate it
      */
     public Optional<String> encodeAsString() {
-        try {
 
-            if(cursors.isEmpty()){
-                return Optional.absent();
-            }
+        //always return cached if we are called 2x
+        if ( encodedValue != null ) {
+            return encodedValue;
+        }
+
+        if ( !edgePath.isPresent() ) {
+            encodedValue = Optional.absent();
+            return encodedValue;
+        }
+
+
+        try {
 
+            //no edge path, short circuit
 
             final ObjectNode map = MAPPER.createObjectNode();
 
-            for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) {
 
-                final CursorEntry cursorEntry = entry.getValue();
+            Optional<EdgePath> current = edgePath;
 
-                final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor );
 
-                map.put( entry.getKey().toString(), serialized );
-            }
+            //traverse each edge and add them to our json
+            do {
+
+                final EdgePath edgePath = current.get();
+                final Object cursorValue = edgePath.getCursorValue();
+                final CursorSerializer serializer = edgePath.getSerializer();
+                final int filterId = edgePath.getFilterId();
+
+                final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+                map.put( String.valueOf( filterId ), serialized );
 
+                current = current.get().getPrevious();
+            }
+            while ( current.isPresent() );
 
-            final byte[] output = MAPPER.writeValueAsBytes(map);
+            final byte[] output = MAPPER.writeValueAsBytes( map );
 
             //generate a base64 url save string
             final String value = Base64.getUrlEncoder().encodeToString( output );
 
-            return Optional.of( value );
-
+            encodedValue =  Optional.of( value );
         }
         catch ( JsonProcessingException e ) {
             throw new CursorParseException( "Unable to serialize cursor", e );
         }
-    }
 
-
-    /**
-     * Interal pointer to the cursor and it's serialzed value
-     */
-    private static final class CursorEntry<T> {
-        private final T cursor;
-        private final CursorSerializer<T> serializer;
-
-
-        private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) {
-            this.cursor = cursor;
-            this.serializer = serializer;
-        }
+        return encodedValue;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
new file mode 100644
index 0000000..e4d5d44
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
new file mode 100644
index 0000000..c68dc4a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Abstract class for filters to extend that require a cursor
+ * @param <T> The input type
+ * @param <R> The response type
+ * @param <C> The cursor type
+ */
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+
+
+
+    //TODO not a big fan of this, but not sure how to build resume otherwise
+    private CursorSeek<C> cursorSeek;
+
+
+    /**
+     * Return the parsed value of the cursor from the last request, if it exists
+     */
+    protected Optional<C> getSeekValue() {
+
+        if(cursorSeek == null) {
+            final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
+            cursorSeek = new CursorSeek<>( cursor );
+        }
+
+        return cursorSeek.getSeekValue();
+
+    }
+
+
+    /**
+     * Sets the cursor into our pipeline context
+     */
+    protected FilterResult<R> createFilterResult( final R emit, final C cursorValue, final Optional<EdgePath> parent ){
+
+
+        //create a current path, and append our parent path to it
+        final EdgePath<C> newEdgePath =
+            new EdgePath<>( pipelineContext.getId(), cursorValue, getCursorSerializer(), parent );
+
+        //emit our value with the parent path
+        return new FilterResult<>( emit, Optional.of( newEdgePath ) );
+
+    }
+
+
+    /**
+     * Return the class to be used when parsing the cursor
+     */
+    protected abstract CursorSerializer<C> getCursorSerializer();
+
+
+    /**
+     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
+     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
+     */
+    protected static class CursorSeek<C> {
+
+        private Optional<C> seek;
+
+        private CursorSeek(final Optional<C> cursorValue){
+            seek = cursorValue;
+        }
+
+
+        /**
+         * Get the seek value to use when searching
+         * @return
+         */
+        public Optional<C> getSeekValue(){
+            final Optional<C> toReturn = seek;
+
+            seek = Optional.absent();
+
+            return toReturn;
+        }
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
deleted file mode 100644
index 8d7f106..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> {
-
-
-    protected PipelineContext pipelineContext;
-
-
-    @Override
-    public void setContext( final PipelineContext pipelineContext ) {
-        this.pipelineContext = pipelineContext;
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
deleted file mode 100644
index c23a1b7..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Abstract class for filters to extend that require a cursor
- * @param <T> The input type
- * @param <R> The response type
- * @param <C> The cursor type
- */
-public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> {
-
-
-
-    //TODO not a big fan of this, but not sure how to build resume otherwise
-    private CursorSeek<C> cursorSeek;
-
-
-    /**
-     * Return the parsed value of the cursor from the last request, if it exists
-     */
-    protected Optional<C> getSeekValue() {
-
-        if(cursorSeek == null) {
-            final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
-            cursorSeek = new CursorSeek<>( cursor );
-        }
-
-        return cursorSeek.getSeekValue();
-
-    }
-
-
-    /**
-     * Sets the cursor into our pipeline context
-     * @param newValue
-     */
-    protected void setCursor(final C newValue){
-        pipelineContext.setCursorValue( newValue, getCursorSerializer() );
-    }
-
-
-    /**
-     * Return the class to be used when parsing the cursor
-     */
-    protected abstract CursorSerializer<C> getCursorSerializer();
-
-
-    /**
-     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
-     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
-     */
-    protected static class CursorSeek<C> {
-
-        private Optional<C> seek;
-
-        private CursorSeek(final Optional<C> cursorValue){
-            seek = cursorValue;
-        }
-
-
-        /**
-         * Get the seek value to use when searching
-         * @return
-         */
-        public Optional<C> getSeekValue(){
-            final Optional<C> toReturn = seek;
-
-            seek = Optional.absent();
-
-            return toReturn;
-        }
-
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
deleted file mode 100644
index 4e6d06e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of ids, and emits
- * an observable of ids
- */
-public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
index 69d929c..e28ce44 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
@@ -20,11 +20,18 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
 /**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
+ * A command that is used to reduce our stream of results into a stream of final batch outputs.  When used
+ * no further transformation or encoding should occur.  Otherwise EdgePath data will be lost, and serialization cannot occur
+ * across requests
+ *
+ * @param <T>  The input type
+ * @param <R> The output type
  */
-public interface Collector<T, R> extends PipelineOperation<T, R> {
+public interface Collector<T, R> extends PipelineOperation<T,R> {
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
index 6893b34..dd200b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
@@ -20,8 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 
 
 /**
@@ -29,16 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollec
  */
 public interface CollectorFactory {
 
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    EntityLoadCollector entityLoadCollector();
 
     /**
-     * Get the collector for collection candidate results to entities
+     * Get the results page collector
      * @return
      */
-    CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector();
-
+   ResultsPageCollector getResultsPageCollector();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
new file mode 100644
index 0000000..c560fad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A path from our input element to our emitted element.  A list of EdgePaths comprise a path through the graph.  The chains of edge paths will result
+ * in a cursor when aggregated.  If a graph traversal is the following
+ *
+ * applicationId(1) - "users" -> userId(2) - "devices" -> deviceId(3).  There would be 2 EdgePath
+ *
+ *  EdgePath("users"->userId(2)) <- parent - EdgePath("devices" -> deviceId(3))
+ */
+public class EdgePath<C> {
+
+
+    private final int filterId;
+    private final C cursorValue;
+    private final CursorSerializer<C> serializer;
+    private final Optional<EdgePath> previous;
+
+
+    /**
+     *
+     * @param filterId The id of the filter that generated this path
+     * @param cursorValue The value to resume seeking on the path
+     * @param serializer The serializer to serialize the value
+     * @param parent The parent graph path edge to reach this path
+     */
+    public EdgePath( final int filterId, final C cursorValue, final CursorSerializer<C> serializer,
+                     final Optional<EdgePath> parent ) {
+        this.filterId = filterId;
+        this.cursorValue = cursorValue;
+        this.serializer = serializer;
+        this.previous = parent;
+    }
+
+
+    public C getCursorValue() {
+        return cursorValue;
+    }
+
+
+    public int getFilterId() {
+        return filterId;
+    }
+
+
+    public Optional<EdgePath> getPrevious() {
+        return previous;
+    }
+
+
+    public CursorSerializer<C> getSerializer() {
+        return serializer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index ace62db..054a85a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -20,11 +20,12 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 
 
 /**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of ids, and emits
- * an observable of ids
+ * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of FilterResult, and emits
+ * an observable of FilterResults.  Filters should never emit groups or objects that represent collections.  Items should
+ * always be emitted 1 at a time.  It is the responsibility of the collector to aggregate results.
  */
-public interface Filter<T, R> extends PipelineOperation<T, R> {}
+public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 078d981..c465516 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,10 +20,12 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
@@ -43,6 +45,7 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param collectionName The collection name to use when reading the graph
      */
     ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName );
@@ -57,12 +60,14 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param connectionName The connection name to use when traversing the graph
      */
     ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param connectionName The connection name to use when traversing the graph
      * @param entityType The entity type to use when traversing the graph
      */
@@ -72,13 +77,15 @@ public interface FilterFactory {
 
     /**
      * Read a connection directly between two identifiers
+     *
      * @param connectionName The connection name to use when traversing the graph
-     * @param targetId  The target Id to use when traversing the graph
+     * @param targetId The target Id to use when traversing the graph
      */
     ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId );
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param query The query to use when querying the entities in the collection
      * @param collectionName The collection name to use when querying
      */
@@ -90,6 +97,7 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param query The query to use when querying the entities in the connection
      * @param connectionName The type of connection to query
      * @param connectedEntityType The type of entity in the connection.  Leave absent to query all entity types
@@ -102,13 +110,24 @@ public interface FilterFactory {
 
 
     /**
-     * Get a candidate ids verifier for collection results.  Should be inserted into pipelines where a query filter is an intermediate step,
-     * not a final filter before collectors
+     * Generate a new instance of the command with the specified parameters
+     */
+    EntityLoadFilter entityLoadFilter();
+
+    /**
+     * Get the collector for collection candidate results to entities
      */
-    CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter();
+    CandidateEntityFilter candidateEntityFilter();
+
+    /**
+     * Get a candidate ids verifier for collection results.  Should be inserted into pipelines where a query filter is
+     * an intermediate step, not a final filter before collectors
+     */
+    CandidateIdFilter candidateResultsIdVerifyFilter();
 
     /**
      * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not doing full traversals
+     *
      * @param entityId The entity id to emit
      */
     EntityIdFilter getEntityIdFilter( final Id entityId );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
new file mode 100644
index 0000000..3c41a2b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A bean that is passed between filters with immutable cursor state
+ * @param <T>
+ */
+public class FilterResult<T> {
+    private final T value;
+    private final Optional<EdgePath> path;
+
+
+    /**
+     * Create a new immutable filtervalue
+     * @param value The value the filter emits
+     * @param path The path to this value, if created
+     */
+    public FilterResult( final T value, final Optional<EdgePath> path ) {
+        this.value = value;
+        this.path = path;
+    }
+
+
+    public T getValue() {
+        return value;
+    }
+
+
+    public Optional<EdgePath> getPath() {
+        return path;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
deleted file mode 100644
index 28bba36..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-import rx.Observable;
-
-
-/**
- * Interface for filtering commands.  All filters must take an observable of Id's as an input.  Output is then determined by subclasses.
-  * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
-  * pipeline
- * @param <T> The input type
- * @param <R>
- */
-public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> {
-
-    void setContext(final PipelineContext pipelineContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 25ab03e..d0e87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,9 +20,6 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -103,5 +100,5 @@ public interface ReadPipelineBuilder {
      * Load our entity results when our previous filter calls graph
      * @return
      */
-    Observable<PipelineResult<ResultsPage>> execute();
+    Observable<ResultsPage> execute();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 4ecfb47..ffb9f7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
+import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,6 +52,8 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
     private final ApplicationScope applicationScope;
 
+    private final CollectorFactory collectorFactory;
+
 
     /**
      * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are
@@ -70,6 +72,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         this.filterFactory = filterFactory;
 
         this.applicationScope = applicationScope;
+        this.collectorFactory = collectorFactory;
 
         //init our cursor to empty
         this.cursor = Optional.absent();
@@ -78,7 +81,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         this.limit = DEFAULT_LIMIT;
 
 
-        this.collectorState = new CollectorState( collectorFactory );
+        this.collectorState = new CollectorState( );
 
         this.filters = new ArrayList<>();
     }
@@ -120,7 +123,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
 
-        this.collectorState.setEntityLoaderCollector();
+        this.collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -132,7 +135,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphCollectionFilter( collectionName ) );
 
-        this.collectorState.setEntityLoaderCollector();
+        this.collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -147,7 +150,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) );
 
-        this.collectorState.setCandidateResultsEntityResultsCollector();
+        this.collectorState.setCandidateEntityFilter();
 
         return this;
     }
@@ -159,7 +162,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         ValidationUtils.verifyIdentity( entityId );
 
         filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -169,7 +172,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
     public ReadPipelineBuilder getConnection( final String connectionName ) {
         Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
         filters.add( filterFactory.readGraphConnectionFilter( connectionName ) );
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -182,7 +185,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) );
 
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
         return this;
     }
 
@@ -196,17 +199,25 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         Preconditions.checkNotNull( query, "query must not be null" );
 
         filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) );
-        collectorState.setCandidateResultsEntityResultsCollector();
+        collectorState.setCandidateEntityFilter();
         return this;
     }
 
 
     @Override
-    public Observable<PipelineResult<ResultsPage>> execute() {
+    public Observable<ResultsPage> execute() {
 
         ValidationUtils.validateApplicationScope( applicationScope );
 
-        final Collector<?, ResultsPage> collector = collectorState.getCollector();
+
+        //add our last filter that will generate entities
+        final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+
+        filters.add( finalFilter );
+
+
+        //execute our collector
+        final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector();
 
         Preconditions.checkNotNull( collector,
             "You have not specified an operation that creates a collection filter.  This is required for loading "
@@ -229,46 +240,52 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
      * A mutable state for our collectors.  Rather than create a new instance each time, we create a singleton
      * collector
      */
-    private static final class CollectorState {
-        private final CollectorFactory collectorFactory;
+    private final class CollectorState {
+
 
-        private EntityLoadCollector entityLoadCollector;
+        private EntityLoadFilter entityLoadCollector;
 
-        private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector;
+        private CandidateEntityFilter candidateEntityFilter;
 
+        private Filter entityLoadFilter;
 
-        private Collector<?, ResultsPage> collector = null;
 
 
-        private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;}
+        private CollectorState( ){}
 
 
-        public void setEntityLoaderCollector() {
+        /**
+         * Set our final filter to be a load entity by Id filter
+         */
+        public void setIdEntityLoaderFilter() {
             if ( entityLoadCollector == null ) {
-                entityLoadCollector = collectorFactory.entityLoadCollector();
+                entityLoadCollector = filterFactory.entityLoadFilter();
             }
 
 
-            collector = entityLoadCollector;
+            entityLoadFilter = entityLoadCollector;
         }
 
 
-        public void setCandidateResultsEntityResultsCollector() {
-            if ( candidateResultsEntityResultsCollector == null ) {
-                candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector();
+        /**
+         * Set our final filter to be a load entity by candidate filter
+         */
+        public void setCandidateEntityFilter() {
+            if ( candidateEntityFilter == null ) {
+                candidateEntityFilter = filterFactory.candidateEntityFilter();
             }
 
-            collector = candidateResultsEntityResultsCollector;
+            entityLoadFilter = candidateEntityFilter;
         }
 
 
         public void clear() {
-            collector = null;
+            entityLoadFilter = null;
         }
 
 
-        public Collector<?, ResultsPage> getCollector() {
-            return collector;
+        public Filter<?, Entity> getFinalFilter() {
+            return entityLoadFilter;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 198ac67..1810d65 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -22,18 +22,28 @@ package org.apache.usergrid.corepersistence.pipeline.read;
 
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 
 /**
- * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be considered a "page" of results.
+ * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be
+ * considered a "page" of results.  A hold over from 1.0.  We shouldn't need this when we fully move away from the EM/RM
  */
 public class ResultsPage {
 
     private final List<Entity> entityList;
 
+    private final int limit;
 
-    public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;}
+    private final ResponseCursor responseCursor;
+
+
+    public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+        this.entityList = entityList;
+        this.responseCursor = responseCursor;
+        this.limit = limit;
+    }
 
 
     public List<Entity> getEntityList() {
@@ -43,9 +53,15 @@ public class ResultsPage {
 
     /**
      * Return true if the results page is empty
-     * @return
      */
-    public boolean isEmpty(){
-        return entityList == null || entityList.isEmpty();
+    public boolean hasMoreResults() {
+        return entityList != null && entityList.size() == limit;
+    }
+
+
+
+
+    public ResponseCursor getResponseCursor() {
+        return responseCursor;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
new file mode 100644
index 0000000..1c5175d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractCollector<T, R> implements Collector<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
new file mode 100644
index 0000000..84654aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Takes entities and collects them into results.  This mostly exists for 1.0 compatibility.  Eventually this will
+ * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ */
+public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
+    implements Collector<Entity, ResultsPage> {
+
+
+    @Override
+    public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+        final int limit = pipelineContext.getLimit();
+
+        return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
+            () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
+                collector.add( entity );
+            } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+            new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+    }
+
+
+    /**
+     * A collector that will aggregate our results together
+     */
+    private static class ResultsPageWithCursorCollector {
+
+
+        private final List<Entity> results;
+
+        private Optional<EdgePath> lastPath;
+
+
+        private ResultsPageWithCursorCollector( final int limit ) {
+            this.results = new ArrayList<>( limit );
+        }
+
+
+        public void add( final FilterResult<Entity> result ) {
+            this.results.add( result.getValue() );
+            this.lastPath = result.getPath();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index eac8a65..004a696 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -24,11 +24,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
 import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
@@ -44,8 +46,8 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer>
-    implements CandidateResultsFilter {
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
+    implements Filter<Id, Candidate> {
 
     private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
 
@@ -66,7 +68,7 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
 
     @Override
-    public Observable<CandidateResults> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
 
         //get the graph manager
         final ApplicationEntityIndex applicationEntityIndex =
@@ -80,12 +82,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return observable.flatMap( idFilterResult -> {
 
-            final SearchEdge searchEdge = getSearchEdge( id );
+            final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
 
 
-            final Observable<CandidateResults> candidates = Observable.create( subscriber -> {
+            final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
 
                 //our offset to our start value.  This will be set the first time we emit
                 //after we receive new ids, we want to reset this to 0
@@ -98,19 +100,14 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
                 subscriber.onStart();
 
-                //emit while we have values from ES
-                while ( true ) {
+                //emit while we have values from ES and someone is subscribed
+                while ( !subscriber.isUnsubscribed() ) {
 
 
                     try {
                         final CandidateResults candidateResults =
                             applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
 
-                        currentOffSet += candidateResults.size();
-
-                        //set the cursor for the next value
-                        setCursor( currentOffSet );
-
                         /**
                          * No candidates, we're done
                          */
@@ -119,7 +116,25 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
                             return;
                         }
 
-                        subscriber.onNext( candidateResults );
+
+                        for( CandidateResult candidateResult: candidateResults){
+
+                            //our subscriber unsubscribed, break out
+                            if(subscriber.isUnsubscribed()){
+                              return;
+                            }
+
+                            final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+                            final FilterResult<Candidate>
+                                result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+                            subscriber.onNext( result );
+
+                            currentOffSet++;
+                        }
+
+
                     }
                     catch ( Throwable t ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
new file mode 100644
index 0000000..ab9d5d9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+    private final CandidateResult candidateResult;
+    private final SearchEdge searchEdge;
+
+
+    /**
+     * Create a new Candidate for further processing
+     * @param candidateResult  The candidate result
+     * @param searchEdge The search edge this was searched on
+     */
+    public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+        this.candidateResult = candidateResult;
+        this.searchEdge = searchEdge;
+    }
+
+
+    public CandidateResult getCandidateResult() {
+        return candidateResult;
+    }
+
+
+    public SearchEdge getSearchEdge() {
+        return searchEdge;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
new file mode 100644
index 0000000..d30917c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency.  Note that all entities may not be emitted if our load crosses page boundaries.  It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
+    implements Filter<Candidate, Entity> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                  final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+    @Override
+       public Observable<FilterResult<Entity>> call(
+           final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        //buffer them to get a page size we can make 1 network hop
+        final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+            //load them
+            .flatMap( candidateResults -> {
+                    //flatten toa list of ids to load
+                    final Observable<List<Id>> candidateIds =
+                        Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+                    //load the ids
+                    final Observable<EntitySet> entitySetObservable =
+                        candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map(
+                        entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+                                              .flatMap(
+                                                  entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        //if we filter all our results, we want to continue to try the next page
+        return searchIdSetObservable;
+    }
+
+
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+                               final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        public EntityIndexBatch getBatch() {
+            return batch;
+        }
+
+
+        private void validate( final FilterResult<Candidate> filterResult ) {
+
+            final Candidate candidate = filterResult.getValue();
+            final CandidateResult candidateResult = candidate.getCandidateResult();
+            final SearchEdge searchEdge = candidate.getSearchEdge();
+            final Id candidateId = candidateResult.getId();
+            final UUID candidateVersion = candidateResult.getVersion();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null ) {
+                logger.warn(
+                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue",
+                    candidateId, candidateVersion );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+
+            }
+
+
+            final UUID entityVersion = entity.getVersion();
+            final Id entityId = entity.getId();
+
+
+
+
+
+            //entity is newer than ES version, could be an update or the entity is marked as deleted
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //they're the same add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}


[02/11] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
new file mode 100644
index 0000000..d598a2e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+    @Inject
+    public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+        //it's more efficient to make 1 network hop to get everything, then drop our results if required
+        final Observable<FilterResult<Entity>> entityObservable =
+            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+                    final Observable<EntitySet> entitySetObservable =
+                        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+                                  .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+                                              .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                            entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        return entityObservable;
+    }
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this
+     * state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final List<FilterResult<Id>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Id> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        private void validate( final FilterResult<Id> filterResult ) {
+
+            final Id candidateId = filterResult.getValue();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null || !entity.getEntity().isPresent() ) {
+                logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue", candidateId );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //it exists, add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 12306fd..7371579 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
 /**
  * Command for reading graph edges on a connection
  */
-public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
     private final String connectionName;
@@ -61,8 +62,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
     }
 
 
+
     @Override
-    public Observable<Id> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
 
         //get the graph manager
         final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
@@ -73,20 +75,18 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return filterResultObservable.flatMap( idFilterResult -> {
 
               //set our our constant state
             final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = idFilterResult.getValue();
 
             final SimpleSearchByIdType search =
                 new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                     entityType, startFromCursor );
 
-            /**
-             * TODO, pass a message with pointers to our cursor values to be generated later
-             */
-            return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map(
-                edge -> edge.getTargetNode() );
+            return graphManager.loadEdgesFromSourceByType( search ).map(
+                edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
         } );
     }
 
@@ -95,4 +95,6 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
     protected CursorSerializer<Edge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index f3c2a9c..0260d1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -26,12 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -51,13 +49,17 @@ public class ObservableQueryExecutor implements QueryExecutor {
     public Iterator<Results> iterator;
 
 
-    public ObservableQueryExecutor( final Observable<PipelineResult<ResultsPage>> resultsObservable ) {
+    public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
        //map to our old results objects, return a default empty if required
         this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
     }
 
 
-
+    /**
+     *
+     * @param cpEntity
+     * @return
+     */
     private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
 
 
@@ -72,9 +74,8 @@ public class ObservableQueryExecutor implements QueryExecutor {
         return entity;
     }
 
-    private Results createResults( final PipelineResult<ResultsPage> pipelineResults ){
+    private Results createResults( final ResultsPage resultsPage ){
 
-        final ResultsPage resultsPage = pipelineResults.getResult();
         final List<Entity> entityList = resultsPage.getEntityList();
         final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
 
@@ -85,10 +86,15 @@ public class ObservableQueryExecutor implements QueryExecutor {
 
         final Results results = Results.fromEntities( resultsEntities );
 
-        if(pipelineResults.getCursor().isPresent()) {
-            results.setCursor( pipelineResults.getCursor().get() );
-        }
 
+        //add the cursor if our limit is the same
+        if(resultsPage.hasMoreResults()) {
+            final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
+
+            if ( cursor.isPresent() ) {
+                results.setCursor( cursor.get() );
+            }
+        }
         return results;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 6e15d54..fd65ebf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -24,11 +24,11 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 import org.junit.Test;
 
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 import com.google.common.base.Optional;
 
@@ -41,7 +41,10 @@ public class CursorTest {
     @Test
     public void testCursors(){
 
-        ResponseCursor responseCursor = new ResponseCursor();
+
+
+
+
 
 
         //test encoding edge
@@ -58,13 +61,18 @@ public class CursorTest {
         final Integer query2 = 20;
 
 
-        responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE );
 
-        responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE );
+        final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
+
+        final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+
+        final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
+
+        final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+
 
-        responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE);
 
-        responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE);
+        ResponseCursor responseCursor = new ResponseCursor( Optional.of(filter0Path) );
 
         final Optional<String> cursor = responseCursor.encodeAsString();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index 1e63df1..a157e47 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -42,14 +42,10 @@ public class CandidateResults implements Iterable<CandidateResult> {
 
     private final List<CandidateResult> candidates;
     private final Collection<SelectFieldMapping> getFieldMappings;
-    private final SearchEdge searchEdge;
 
-
-    public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings,
-                             final SearchEdge searchEdge ) {
+    public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
         this.candidates = candidates;
         this.getFieldMappings = getFieldMappings;
-        this.searchEdge = searchEdge;
         offset = Optional.absent();
     }
 
@@ -91,11 +87,6 @@ public class CandidateResults implements Iterable<CandidateResult> {
     }
 
 
-    public SearchEdge getSearchEdge() {
-        return searchEdge;
-    }
-
-
     /**
      * Get the candidates
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2d4696a..5b67060 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -169,7 +169,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         }
         failureMonitor.success();
 
-        return parseResults(searchResponse, parsedQuery, searchEdge, limit, offset);
+        return parseResults(searchResponse, parsedQuery, limit, offset);
     }
 
 
@@ -227,7 +227,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
     /**
      * Parse the results and return the canddiate results
      */
-    private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final SearchEdge searchEdge,
+    private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
                                            final int limit, final int from ) {
 
         final SearchHits searchHits = searchResponse.getHits();
@@ -244,8 +244,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
             candidates.add( candidateResult );
         }
 
-        final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings(),
-            searchEdge );
+        final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
 
         // >= seems odd.  However if we get an overflow, we need to account for it.
         if (  hits.length >= limit ) {


[09/11] incubator-usergrid git commit: Updates observable short circuit and fixes NPE on empty results

Posted by to...@apache.org.
Updates observable short circuit and fixes NPE on empty results


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5bb77985
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5bb77985
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5bb77985

Branch: refs/heads/two-dot-o-dev
Commit: 5bb77985ba386b72d66a5584057a5d98b3f3e111
Parents: a673c81
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:25 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:25 2015 -0600

----------------------------------------------------------------------
 .../elasticsearch/AbstractElasticSearchFilter.java  | 16 ++++++++--------
 .../org/apache/usergrid/persistence/Results.java    |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5bb77985/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index 004a696..f403e21 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -108,20 +108,13 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
                         final CandidateResults candidateResults =
                             applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
 
-                        /**
-                         * No candidates, we're done
-                         */
-                        if ( candidateResults.size() == 0 ) {
-                            subscriber.onCompleted();
-                            return;
-                        }
 
 
                         for( CandidateResult candidateResult: candidateResults){
 
                             //our subscriber unsubscribed, break out
                             if(subscriber.isUnsubscribed()){
-                              return;
+                                return;
                             }
 
                             final Candidate candidate = new Candidate( candidateResult, searchEdge );
@@ -134,6 +127,13 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
                             currentOffSet++;
                         }
 
+                        /**
+                         * No candidates, we're done
+                         */
+                        if (candidateResults.size() < limit) {
+                            subscriber.onCompleted();
+                            return;
+                        }
 
                     }
                     catch ( Throwable t ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5bb77985/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index e572070..fa221f5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -1293,7 +1293,7 @@ public class Results implements Iterable<Entity> {
     /** uses cursor to get next batch of Results (returns null if no cursor) */
     public Results getNextPageResults() throws Exception {
         if ( queryExecutor == null || !queryExecutor.hasNext() ) {
-            return null;
+            return new Results();
         }
 
 


[05/11] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587

Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/294a7d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/294a7d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/294a7d90

Branch: refs/heads/two-dot-o-dev
Commit: 294a7d900784283ff7c038585728418d799b7010
Parents: cd983d6 faafa37
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:41:05 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:41:05 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  34 ++---
 .../usergrid/persistence/EntityManagerIT.java   |   4 +-
 .../graph/impl/SimpleSearchByEdge.java          |   5 +-
 .../usergrid/persistence/index/IndexFig.java    |  11 +-
 .../persistence/index/IndexRefreshCommand.java  |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 136 +++++++++++--------
 .../persistence/index/impl/EntityIndexTest.java |   2 +-
 8 files changed, 112 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/294a7d90/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------