You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/04 19:32:51 UTC

[01/12] incubator-usergrid git commit: Updated mapping to fix missing doc_values and disable norms since we use external sorting

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-609 c6bbfba98 -> 7b215250a


Updated mapping to fix missing doc_values and disable norms  since we use external sorting


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d4847a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d4847a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d4847a8

Branch: refs/heads/USERGRID-609
Commit: 6d4847a817aefa65075970a54af85faf159f9176
Parents: fd6305c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 10:16:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 10:16:10 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/persistence/index/usergrid-mappings.json | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d4847a8/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index f12b66b..bee84a2 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -73,6 +73,10 @@
                     },
                     "string": {
                         "type": "string",
+                        "doc_values": true,
+                        "norms": {
+                            "enabled": false
+                        },
                         "fields": {
                             "exact": {
                                 "type": "string",


[05/12] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/294a7d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/294a7d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/294a7d90

Branch: refs/heads/USERGRID-609
Commit: 294a7d900784283ff7c038585728418d799b7010
Parents: cd983d6 faafa37
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:41:05 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:41:05 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  34 ++---
 .../usergrid/persistence/EntityManagerIT.java   |   4 +-
 .../graph/impl/SimpleSearchByEdge.java          |   5 +-
 .../usergrid/persistence/index/IndexFig.java    |  11 +-
 .../persistence/index/IndexRefreshCommand.java  |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 136 +++++++++++--------
 .../persistence/index/impl/EntityIndexTest.java |   2 +-
 8 files changed, 112 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/294a7d90/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------


[09/12] incubator-usergrid git commit: Updates observable short circuit and fixes NPE on empty results

Posted by gr...@apache.org.
Updates observable short circuit and fixes NPE on empty results


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5bb77985
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5bb77985
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5bb77985

Branch: refs/heads/USERGRID-609
Commit: 5bb77985ba386b72d66a5584057a5d98b3f3e111
Parents: a673c81
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:25 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:25 2015 -0600

----------------------------------------------------------------------
 .../elasticsearch/AbstractElasticSearchFilter.java  | 16 ++++++++--------
 .../org/apache/usergrid/persistence/Results.java    |  2 +-
 2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5bb77985/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index 004a696..f403e21 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -108,20 +108,13 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
                         final CandidateResults candidateResults =
                             applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
 
-                        /**
-                         * No candidates, we're done
-                         */
-                        if ( candidateResults.size() == 0 ) {
-                            subscriber.onCompleted();
-                            return;
-                        }
 
 
                         for( CandidateResult candidateResult: candidateResults){
 
                             //our subscriber unsubscribed, break out
                             if(subscriber.isUnsubscribed()){
-                              return;
+                                return;
                             }
 
                             final Candidate candidate = new Candidate( candidateResult, searchEdge );
@@ -134,6 +127,13 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
                             currentOffSet++;
                         }
 
+                        /**
+                         * No candidates, we're done
+                         */
+                        if (candidateResults.size() < limit) {
+                            subscriber.onCompleted();
+                            return;
+                        }
 
                     }
                     catch ( Throwable t ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5bb77985/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index e572070..fa221f5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -1293,7 +1293,7 @@ public class Results implements Iterable<Entity> {
     /** uses cursor to get next batch of Results (returns null if no cursor) */
     public Results getNextPageResults() throws Exception {
         if ( queryExecutor == null || !queryExecutor.hasNext() ) {
-            return null;
+            return new Results();
         }
 
 


[03/12] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

Posted by gr...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
new file mode 100644
index 0000000..56e1c1c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions
+ * Input is a batch of candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
+    implements Filter<Candidate, Id> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                              final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+
+    @Override
+      public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap(
+            candidateResults -> {
+                //flatten toa list of ids to load
+                final Observable<List<Id>> candidateIds =
+                    Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() )
+                              .toList();
+
+                //load the ids
+                final Observable<VersionSet> versionSetObservable =
+                    candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+                //now we have a collection, validate our canidate set is correct.
+
+                return versionSetObservable.map(
+                    entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
+                                           .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                        entityCollector -> Observable.from( entityCollector.collectResults() ) );
+        } );
+
+        return searchIdSetObservable;
+    }
+
+
+
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private static final class EntityCollector {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+        private List<FilterResult<Id>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final VersionSet versionSet;
+
+
+        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+                                final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.versionSet = versionSet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( versionSet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Id>> collectResults() {
+            return results;
+        }
+
+
+        /**
+         * Validate each candidate results vs the data loaded from cass
+         * @param filterCandidate
+         */
+        private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+            final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+            final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+            final UUID candidateVersion = candidateResult.getVersion();
+
+            final UUID entityVersion = logEntry.getVersion();
+
+            final Id entityId = logEntry.getEntityId();
+
+            //entity is newer than ES version
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+            }
+
+            //they're the same add it
+
+            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath()  );
+
+            results.add( result );
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
deleted file mode 100644
index 465ff22..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResults object and return them as results
- */
-public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage>
-    implements Collector<CandidateResults, ResultsPage> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                                   final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> call( final Observable<CandidateResults> candidateResultsObservable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<ResultsPage> searchIdSetObservable = candidateResultsObservable.flatMap( candidateResults -> {
-            //flatten toa list of ids to load
-            final Observable<List<Id>> candidateIds =
-                Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
-            //load the ids
-            final Observable<EntitySet> entitySetObservable =
-                candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
-            //now we have a collection, validate our canidate set is correct.
-
-            return entitySetObservable
-                .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                .doOnNext( entityCollector -> entityCollector.merge() )
-                .map( entityCollector -> entityCollector.getResults() );
-        } );
-
-        //if we filter all our results, we want to continue to try the next page
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<Entity> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final CandidateResults candidateResults;
-        private final EntitySet entitySet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet,
-                                final CandidateResults candidateResults ) {
-            this.batch = batch;
-            this.entitySet = entitySet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( entitySet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final CandidateResult candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public ResultsPage getResults() {
-            return new ResultsPage( results );
-        }
-
-
-        public EntityIndexBatch getBatch() {
-            return batch;
-        }
-
-
-        private void validate( final CandidateResult candidateResult ) {
-
-            final Id candidateId = candidateResult.getId();
-            final UUID candidateVersion = candidateResult.getVersion();
-
-
-            final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
-            //doesn't exist warn and drop
-            if ( entity == null ) {
-                logger.warn(
-                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
-                        + "  Ignoring since this could be a region sync issue",
-                    candidateId, candidateVersion );
-
-
-                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-
-            }
-
-
-            final UUID entityVersion = entity.getVersion();
-
-
-            //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) {
-
-                final Id entityId = entity.getId();
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                final Id entityId = entity.getId();
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
-                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-            }
-
-            //they're the same add it
-
-
-            results.add( entity.getEntity().get() );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
deleted file mode 100644
index bb9ab76..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions
- * Input is a batch of candidate results, output is a stream of validated Ids
- */
-public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id>
-    implements PipelineOperation<CandidateResults, Id> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                           final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-
-    @Override
-    public Observable<Id> call( final Observable<CandidateResults> observable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> {
-            //flatten toa list of ids to load
-            final Observable<List<Id>> candidateIds =
-                Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
-            //load the ids
-            final Observable<VersionSet> versionSetObservable =
-                candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
-            //now we have a collection, validate our canidate set is correct.
-
-            return versionSetObservable
-                .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                .doOnNext( entityCollector -> entityCollector.merge() )
-                .flatMap( entityCollector -> Observable.from(  entityCollector.collectResults() ) );
-        } );
-
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<Id> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final CandidateResults candidateResults;
-        private final VersionSet versionSet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
-                                final CandidateResults candidateResults ) {
-            this.batch = batch;
-            this.versionSet = versionSet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( versionSet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final CandidateResult candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public List<Id> collectResults() {
-            return results;
-        }
-
-
-        /**
-         * Validate each candidate results vs the data loaded from cass
-         * @param candidateResult
-         */
-        private void validate( final CandidateResult candidateResult ) {
-
-            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
-            final UUID candidateVersion = candidateResult.getVersion();
-
-            final UUID entityVersion = logEntry.getVersion();
-
-            final Id entityId = logEntry.getEntityId();
-
-            //entity is newer than ES version
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-            }
-
-            //they're the same add it
-
-            results.add( entityId );
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
deleted file mode 100644
index cc96633..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class CollectionRefsVerifier extends VersionVerifier {
-
-
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
-        for ( Id id : ids ) {
-            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
-        }
-        return Results.fromRefList( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 94c91d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
-
-
-    public CollectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
-        final ApplicationEntityIndex applicationEntityIndex ) {
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
-                                    final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new CollectionRefsVerifier();
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new IdsVerifier();
-        }
-        else {
-            verifier = new EntityVerifier( Query.MAX_LIMIT );
-        }
-
-        return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope,
-            scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
deleted file mode 100644
index 6b7bdde..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
-
-
-/**
- * Verifier for creating connections
- */
-public class ConnectionRefsVerifier extends VersionVerifier {
-
-
-    private final EntityRef ownerId;
-    private final String connectionType;
-
-
-    public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
-        this.ownerId = ownerId;
-        this.connectionType = connectionType;
-    }
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<ConnectionRef> refs = new ArrayList<>();
-        for ( Id id : ids ) {
-            refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid())  ));
-        }
-
-        return Results.fromConnections( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 55b95a9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
-    private final EntityRef ownerId;
-    private final String connectionType;
-
-
-    public ConnectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
-                                               final ApplicationEntityIndex applicationEntityIndex, final EntityRef ownerId,
-                                               final String connectionType ) {
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
-        this.ownerId = ownerId;
-        this.connectionType = connectionType;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
-                                    final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
-            ;
-        }
-        else {
-            verifier = new EntityVerifier( Query.MAX_LIMIT );
-        }
-
-        return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
deleted file mode 100644
index 6e170f8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.index.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> {
-
-    private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
-
-    private final ResultsLoaderFactory resultsLoaderFactory;
-
-    private final ApplicationScope applicationScope;
-
-    private final ApplicationEntityIndex entityIndex;
-
-    private final SearchEdge indexScope;
-
-    private final SearchTypes types;
-
-    private final String query;
-
-    private final Optional<Integer> setOffsetFromCursor;
-
-    private final int limit;
-
-    private int offset;
-
-
-    private Results currentResults;
-
-    private boolean moreToLoad = true;
-
-
-
-
-    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
-                                       final ApplicationScope applicationScope, final SearchEdge indexScope,
-                                       final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) {
-        this.resultsLoaderFactory = resultsLoaderFactory;
-        this.applicationScope = applicationScope;
-        this.entityIndex = entityIndex;
-        this.indexScope = indexScope;
-        this.types = types;
-        this.setOffsetFromCursor = setOffsetFromCursor;
-
-        //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
-        //once we start subscribing to streams.
-        this.query = query;
-        this.limit = limit;
-    }
-
-
-    @Override
-    public Iterator<Results> iterator() {
-        return this;
-    }
-
-
-    private void loadNextPage() {
-        // Because of possible stale entities, which are filtered out by buildResults(),
-        // we loop until the we've got enough results to satisfy the query limit.
-
-        final int maxQueries = 10; // max re-queries to satisfy query limit
-
-
-        Results results = null;
-        int queryCount = 0;
-
-
-        CandidateResults crs = null;
-
-        int newLimit = limit;
-
-        while ( queryCount++ < maxQueries ) {
-
-            crs = entityIndex.search( indexScope, types, query, newLimit , offset);
-
-
-            logger.debug( "Calling build results with crs {}", crs );
-            results = buildResults( indexScope, crs );
-
-            /**
-             * In an edge case where we delete stale entities, we could potentially get less results than expected.
-             * This will only occur once during the repair phase.
-             * We need to ensure that we short circuit before we overflow the requested limit during a repair.
-             */
-            if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more
-                break;
-            }
-
-
-            //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
-            //circuit to avoid over returning the result set
-
-
-            // need to query for more
-            // ask for just what we need to satisfy, don't want to exceed limit
-            newLimit =  newLimit - results.size();
-
-            logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
-                limit, newLimit, queryCount
-            } );
-        }
-
-        //now set our cursor if we have one for the next iteration
-        if ( results.hasCursor() ) {
-            moreToLoad = true;
-        }
-
-        else {
-            moreToLoad = false;
-        }
-
-//
-//        //set our select subjects into our query if provided
-//        if(crs != null){
-//            query.setSelectSubjects( crs.getGetFieldMappings() );
-//        }
-//
-
-        //set our current results and the flag
-        this.currentResults = results;
-    }
-
-
-
-    /**
-     * Build results from a set of candidates, and discard those that represent stale indexes.
-     *
-     * @param indexScope The index scope to execute the search on
-     * @param crs Candidates to be considered for results
-     */
-    private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) {
-
-        logger.debug( "buildResults()  from {} candidates", crs.size() );
-
-        //get an instance of our results loader
-        final ResultsLoader resultsLoader =
-            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES );
-
-        //load the results
-        final Results results = resultsLoader.loadResults(crs);
-
-        //signal for post processing
-        resultsLoader.postProcess();
-
-        //set offset into query
-
-        logger.debug( "Returning results size {}", results.size() );
-
-        return results;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-
-        //we've tried to load and it's empty and we have more to load, load the next page
-        if ( currentResults == null ) {
-            //there's nothing left to load, nothing to do
-            if ( !moreToLoad ) {
-                return false;
-            }
-
-            //load the page
-
-            loadNextPage();
-        }
-
-
-        //see if our current results are not null
-        return currentResults != null;
-    }
-
-
-    @Override
-    public Results next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more results present" );
-        }
-
-        final Results toReturn = currentResults;
-
-        currentResults = null;
-
-        return toReturn;
-    }
-
-    @Override
-    public void remove() {
-        throw new RuntimeException("Remove not implemented!!");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
deleted file mode 100644
index d73c731..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-
-
-/**
- * A loader that verifies versions are correct in cassandra and match elasticsearch
- */
-public class EntityVerifier implements ResultsVerifier {
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-
-    private EntitySet ids;
-
-    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
-
-
-    public EntityVerifier( final int maxSize ) {
-        this.entityMapping = new HashMap<>( maxSize );
-    }
-
-
-    @Override
-    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
-        ids = ecm.load( idsToLoad ).toBlocking().last();
-        logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size());
-    }
-
-
-    @Override
-    public boolean isValid( final CandidateResult candidateResult ) {
-        final Id entityId = candidateResult.getId();
-
-        final MvccEntity savedEntity = ids.getEntity( entityId );
-
-        //version wasn't found deindex
-        if ( savedEntity == null ) {
-            logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() );
-            return false;
-        }
-
-        final UUID candidateVersion = candidateResult.getVersion();
-        final UUID savedVersion = savedEntity.getVersion();
-
-        if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) {
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
-                    entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity
-            } );
-
-            return false;
-        }
-
-
-        final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity();
-
-        if ( !entity.isPresent() ) {
-            logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ",
-                    entityId.getUuid(), savedEntity.getEntity() );
-            return false;
-        }
-
-        entityMapping.put( entityId, entity.get() );
-
-        return true;
-    }
-
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-
-        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
-
-        for ( final Id id : ids ) {
-            final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
-
-            Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() );
-
-            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
-            entity.addProperties( entityMap );
-            ugEntities.add( entity );
-        }
-
-        return Results.fromEntities( ugEntities );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
deleted file mode 100644
index ade64a2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-
-public class FilteringLoader implements ResultsLoader {
-
-    private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ResultsVerifier resultsVerifier;
-    private final ApplicationScope applicationScope;
-    private final SearchEdge indexScope;
-    private final EntityIndexBatch indexBatch;
-
-
-    /**
-     * Create an instance of a filter loader
-     *
-     * @param entityCollectionManager The entityCollectionManagerFactory
-     * @param resultsVerifier The verifier to verify the candidate results
-     * @param applicationScope The application scope to perform the load
-     * @param indexScope The index scope used in the search
-     */
-    protected FilteringLoader( final  EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex,  final ResultsVerifier resultsVerifier,
-                               final ApplicationScope applicationScope, final SearchEdge indexScope ) {
-
-        this.entityCollectionManager = entityCollectionManager;
-        this.resultsVerifier = resultsVerifier;
-        this.applicationScope = applicationScope;
-        this.indexScope = indexScope;
-
-        indexBatch = applicationEntityIndex.createBatch();
-    }
-
-
-    @Override
-    public Results loadResults( final CandidateResults crs ) {
-
-
-        if ( crs.size() == 0 ) {
-            return new Results();
-        }
-
-
-        // For each entity, holds the index it appears in our candidates for keeping ordering correct
-        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
-
-        // Maps the entity ids to our candidates
-        final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
-
-
-        final Iterator<CandidateResult> iter = crs.iterator();
-
-
-        // TODO, in this case we're "optimizing" due to the limitations of collection scope.
-        // Perhaps  we should change the API to just be an application, then an "owner" scope?
-
-        // Go through the candidates and group them by scope for more efficient retrieval.
-        // Also remove duplicates before we even make a network call
-        for ( int i = 0; iter.hasNext(); i++ ) {
-
-            final CandidateResult currentCandidate = iter.next();
-
-            final Id entityId = currentCandidate.getId();
-
-            //check if we've seen this candidate by id
-            final CandidateResult previousMax = maxCandidateMapping.get( entityId );
-
-            //its not been seen, save it
-            if ( previousMax == null ) {
-                maxCandidateMapping.put( entityId, currentCandidate );
-                orderIndex.put( entityId, i );
-                continue;
-            }
-
-            //we have seen it, compare them
-
-            final UUID previousMaxVersion = previousMax.getVersion();
-
-            final UUID currentVersion = currentCandidate.getVersion();
-
-
-            final CandidateResult toRemove;
-            final CandidateResult toKeep;
-
-            //current is newer than previous.  Remove previous and keep current
-            if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
-                toRemove = previousMax;
-                toKeep = currentCandidate;
-            }
-            //previously seen value is newer than current.  Remove the current and keep the previously seen value
-            else {
-                toRemove = currentCandidate;
-                toKeep = previousMax;
-            }
-
-            //this is a newer version, we know we already have a stale entity, add it to be cleaned up
-
-
-            //de-index it
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
-                    entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion()
-                } );
-
-            //deindex this document, and remove the previous maxVersion
-            //we have to deindex this from our ownerId, since this is what gave us the reference
-            indexBatch.deindex( indexScope, toRemove );
-
-
-            //TODO, fire the entity repair cleanup task here instead of de-indexing
-
-            //replace the value with a more current version
-            maxCandidateMapping.put( entityId, toKeep );
-            orderIndex.put( entityId, i );
-        }
-
-
-        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify
-        // existence and correct versions
-
-        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
-
-
-        final Collection<Id> idsToLoad =
-            Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() {
-                @Nullable
-                @Override
-                public Id apply( @Nullable final CandidateResult input ) {
-                    //NOTE this is never null, we won't need to check
-                    return input.getId();
-                }
-            } );
-
-
-        //now using the scope, load the collection
-
-
-
-        //load the results into the loader for this scope for validation
-        resultsVerifier.loadResults( idsToLoad, entityCollectionManager );
-
-        //now let the loader validate each candidate.  For instance, the "max" in this candidate
-        //could still be a stale result, so it needs validated
-        for ( final Id requestedId : idsToLoad ) {
-
-            final CandidateResult cr = maxCandidateMapping.get( requestedId );
-
-            //ask the loader if this is valid, if not discard it and de-index it
-            if ( !resultsVerifier.isValid( cr ) ) {
-                indexBatch.deindex( indexScope, cr );
-                continue;
-            }
-
-            //if we get here we're good, we need to add this to our results
-            final int candidateIndex = orderIndex.get( requestedId );
-
-            sortedResults.put( candidateIndex, requestedId );
-        }
-
-
-        // NOTE DO NOT execute the batch here.
-        // It changes the results and we need consistent paging until we aggregate all results
-        return resultsVerifier.getResults( sortedResults.values() );
-    }
-
-
-    @Override
-    public void postProcess() {
-        this.indexBatch.execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
deleted file mode 100644
index 4a3bfcd..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class IdsVerifier extends VersionVerifier {
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-
-        final List<UUID> returnIds = new ArrayList<>( ids.size() );
-
-        for ( final Id id : ids ) {
-            returnIds.add( id.getUuid() );
-        }
-
-
-        return Results.fromIdList( returnIds );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
deleted file mode 100644
index c2a3e9a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.CandidateResults;
-
-
-/**
- * Interface for loading results
- */
-public interface ResultsLoader {
-
-    /**
-     * Using the candidate results, load our results.  Should filter stale results
-     * @param  crs The candidate result set
-     * @return Results.  Null safe, but may be empty
-     */
-    public Results loadResults( final CandidateResults crs);
-
-    /**
-     * Post process the load operation
-     */
-    public void postProcess();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
deleted file mode 100644
index 3ccca1b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public interface ResultsLoaderFactory {
-
-    /**
-     * Get the loader for results
-     * @param applicationScope The application scope used to load results
-     * @param indexScope The index scope used in the search
-     * @param
-     */
-    ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope,
-                             final Query.Level resultsLevel );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
deleted file mode 100644
index fe72ca2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface ResultsVerifier {
-
-    /**
-     * Load all the candidate ides for verification
-     * @param ids The Id's to load
-     * @param ecm The entity collection manager
-     */
-    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
-
-    /**
-     * Return true if the candidate result is a valid result that should be retained. If it should
-     * not it should also be removed from the list of possible return values in this loader
-     * @param candidateResult
-     */
-    public boolean isValid(CandidateResult candidateResult);
-
-
-    /**
-     * Load the result set with the given ids
-     * @return
-     */
-    public Results getResults(Collection<Id> ids);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
deleted file mode 100644
index c49fb28..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-
-
-/**
- * A loader that verifies versions are correct in Cassandra and match ElasticSearch
- */
-public abstract class VersionVerifier implements ResultsVerifier {
-
-    private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
-
-    private VersionSet ids;
-
-
-    @Override
-    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
-        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
-    }
-
-
-    @Override
-    public boolean isValid( final CandidateResult candidateResult ) {
-        final Id entityId = candidateResult.getId();
-
-        final MvccLogEntry version = ids.getMaxVersion( entityId );
-
-        //version wasn't found ,deindex
-        if ( version == null ) {
-            logger.warn( "Version for Entity {}:{} not found",
-                    entityId.getUuid(), entityId.getUuid() );
-
-            return false;
-        }
-
-        final UUID savedVersion = version.getVersion();
-
-        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
-            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
-                new Object[] {
-                    entityId.getUuid(),
-                    entityId.getType(),
-                    candidateResult.getVersion(),
-                    savedVersion
-            } );
-
-            return false;
-        }
-
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
deleted file mode 100644
index 6230147..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> {
-
-    private final Id entityId;
-
-
-    @Inject
-    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
-
-    @Override
-    public Observable<Id> call( final Observable<Id> idObservable ) {
-        return Observable.just( entityId );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
deleted file mode 100644
index dd6b9b8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage>
-    implements Collector<Id, ResultsPage> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
-    @Inject
-    public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> call( final Observable<Id> observable ) {
-
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
-        final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap(
-            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) );
-
-
-        final Observable<ResultsPage> resultsObservable = entitySetObservable
-
-            .flatMap( entitySet -> {
-
-                //get our entites and filter missing ones, then collect them into a results object
-                final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() );
-
-
-                //convert them to our old entity model, then filter abscent, meaning they weren't found
-                final Observable<List<Entity>> entitiesPageObservable =
-                    mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() )
-                                        .map( mvccEntity -> mvccEntity.getEntity().get() ).toList();
-
-                //convert them to a list, then map them into results
-                return entitiesPageObservable.map( entities -> new ResultsPage( entities ) );
-            } );
-
-
-        return resultsObservable;
-    }
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index e0f69cf..42b352b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,8 +20,9 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
@@ -39,7 +40,7 @@ import rx.Observable;
 /**
  * Filter should take and Id and a graph edge, and ensure the connection between the two exists
  */
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
     Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
@@ -55,12 +56,13 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
 
 
     @Override
-    public Observable<Id> call( final Observable<Id> idObservable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
 
-        return idObservable.flatMap( id -> {
+        return filterValueObservable.flatMap( filterValue -> {
             final String edgeTypeName = getEdgeName();
+            final Id id = filterValue.getValue();
 
             //create our search
             final SearchByEdge searchByEdge =
@@ -68,7 +70,7 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
                     Optional.absent() );
 
             //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
-            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() );
+            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 4dd34fc..503fcf9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -38,7 +39,7 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
 
@@ -52,7 +53,8 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
 
 
     @Override
-    public Observable<Id> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
 
         //get the graph manager
         final GraphManager graphManager =
@@ -63,10 +65,11 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return previousIds.flatMap( previousFilterValue -> {
 
             //set our our constant state
             final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = previousFilterValue.getValue();
 
 
             final SimpleSearchByEdgeType search =
@@ -78,9 +81,9 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
              */
             return graphManager.loadEdgesFromSource( search )
                 //set our cursor every edge we traverse
-                .doOnNext( edge -> setCursor( edge ) )
+
                     //map our id from the target edge
-                .map( edge -> edge.getTargetNode() );
+                .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
new file mode 100644
index 0000000..5a0e026
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
+
+    private final Id entityId;
+
+
+    @Inject
+    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+       return filterValueObservable.map( idFilterResult ->  new FilterResult( entityId, idFilterResult.getPath() ));
+
+    }
+}


[10/12] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4107ef38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4107ef38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4107ef38

Branch: refs/heads/USERGRID-609
Commit: 4107ef38961dceecaf90c03edfed0d43dc67486b
Parents: 6d4847a d18b8ee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:38 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:38 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  34 ++---
 .../usergrid/persistence/EntityManagerIT.java   |   4 +-
 .../graph/impl/SimpleSearchByEdge.java          |   5 +-
 .../usergrid/persistence/index/IndexFig.java    |  11 +-
 .../persistence/index/IndexRefreshCommand.java  |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 136 +++++++++++--------
 .../persistence/index/impl/EntityIndexTest.java | 123 ++++++++---------
 8 files changed, 173 insertions(+), 144 deletions(-)
----------------------------------------------------------------------



[12/12] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-609

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-609

# By Todd Nine
# Via Todd Nine
* 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid:
  Updates observable short circuit and fixes NPE on empty results
  Fixes graph cursor resume state.
  Fixes resume logic by loading then filtering first id
  Massive refactor.  Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.
  Updated mapping to fix missing doc_values and disable norms  since we use external sorting


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b215250
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b215250
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b215250

Branch: refs/heads/USERGRID-609
Commit: 7b215250afce3303af6a26ecf4ec8918840e37e0
Parents: c6bbfba 9b939d1
Author: GERey <gr...@apigee.com>
Authored: Mon May 4 10:32:43 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Mon May 4 10:32:43 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   5 +-
 .../corepersistence/pipeline/Pipeline.java      |  12 +-
 .../pipeline/PipelineContext.java               |  16 +-
 .../pipeline/PipelineOperation.java             |  39 ++++
 .../pipeline/PipelineResult.java                |  57 -----
 .../pipeline/cursor/ResponseCursor.java         |  81 ++++---
 .../pipeline/read/AbstractFilter.java           |  45 ++++
 .../pipeline/read/AbstractPathFilter.java       | 109 +++++++++
 .../read/AbstractPipelineOperation.java         |  44 ----
 .../pipeline/read/AbstractSeekingFilter.java    | 102 --------
 .../pipeline/read/CandidateResultsFilter.java   |  31 ---
 .../pipeline/read/Collector.java                |  13 +-
 .../pipeline/read/CollectorFactory.java         |  12 +-
 .../corepersistence/pipeline/read/EdgePath.java |  79 +++++++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |  40 +++-
 .../pipeline/read/FilterResult.java             |  56 +++++
 .../pipeline/read/PipelineOperation.java        |  38 ---
 .../pipeline/read/ReadPipelineBuilder.java      |   5 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  80 ++++---
 .../pipeline/read/ResultsPage.java              |  26 ++-
 .../read/collect/AbstractCollector.java         |  46 ++++
 .../pipeline/read/collect/EntityFilter.java     |  68 ++++++
 .../read/collect/IdCursorSerializer.java        |  41 ++++
 .../read/collect/ResultsPageCollector.java      |  80 +++++++
 .../AbstractElasticSearchFilter.java            |  45 ++--
 .../pipeline/read/elasticsearch/Candidate.java  |  55 +++++
 .../elasticsearch/CandidateEntityFilter.java    | 234 +++++++++++++++++++
 .../read/elasticsearch/CandidateIdFilter.java   | 201 ++++++++++++++++
 .../CandidateResultsEntityResultsCollector.java | 217 -----------------
 .../CandidateResultsIdVerifyFilter.java         | 193 ---------------
 .../impl/CollectionRefsVerifier.java            |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  65 ------
 .../impl/ConnectionRefsVerifier.java            |  59 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  73 ------
 .../impl/ElasticSearchQueryExecutor.java        | 224 ------------------
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
 .../elasticsearch/impl/FilteringLoader.java     | 219 -----------------
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ----
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ----
 .../impl/ResultsLoaderFactory.java              |  41 ----
 .../elasticsearch/impl/ResultsVerifier.java     |  52 -----
 .../elasticsearch/impl/VersionVerifier.java     |  85 -------
 .../pipeline/read/entity/EntityIdFilter.java    |  53 -----
 .../read/entity/EntityLoadCollector.java        |  94 --------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  12 +-
 .../read/graph/AbstractReadGraphFilter.java     |  65 +++++-
 .../pipeline/read/graph/EntityIdFilter.java     |  54 +++++
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ++++++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |  20 +-
 .../results/ObservableQueryExecutor.java        |  24 +-
 .../apache/usergrid/persistence/Results.java    |   2 +-
 .../pipeline/cursor/CursorTest.java             |  20 +-
 .../persistence/index/CandidateResults.java     |  11 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   7 +-
 .../persistence/index/usergrid-mappings.json    |   3 +
 56 files changed, 1578 insertions(+), 2099 deletions(-)
----------------------------------------------------------------------



[04/12] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

Posted by gr...@apache.org.
Massive refactor.  Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd983d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd983d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd983d66

Branch: refs/heads/USERGRID-609
Commit: cd983d66260222985431a775454183c2ed2305ea
Parents: 6d4847a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 30 17:40:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 30 17:40:52 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   5 +-
 .../corepersistence/pipeline/Pipeline.java      |   9 +-
 .../pipeline/PipelineContext.java               |  16 +-
 .../pipeline/PipelineOperation.java             |  39 ++++
 .../pipeline/PipelineResult.java                |  57 -----
 .../pipeline/cursor/ResponseCursor.java         |  81 ++++---
 .../pipeline/read/AbstractFilter.java           |  45 ++++
 .../pipeline/read/AbstractPathFilter.java       | 109 +++++++++
 .../read/AbstractPipelineOperation.java         |  44 ----
 .../pipeline/read/AbstractSeekingFilter.java    | 102 --------
 .../pipeline/read/CandidateResultsFilter.java   |  31 ---
 .../pipeline/read/Collector.java                |  13 +-
 .../pipeline/read/CollectorFactory.java         |  12 +-
 .../corepersistence/pipeline/read/EdgePath.java |  79 +++++++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |  31 ++-
 .../pipeline/read/FilterResult.java             |  56 +++++
 .../pipeline/read/PipelineOperation.java        |  38 ---
 .../pipeline/read/ReadPipelineBuilder.java      |   5 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  75 +++---
 .../pipeline/read/ResultsPage.java              |  26 ++-
 .../read/collect/AbstractCollector.java         |  46 ++++
 .../read/collect/ResultsPageCollector.java      |  80 +++++++
 .../AbstractElasticSearchFilter.java            |  47 ++--
 .../pipeline/read/elasticsearch/Candidate.java  |  55 +++++
 .../elasticsearch/CandidateEntityFilter.java    | 234 +++++++++++++++++++
 .../read/elasticsearch/CandidateIdFilter.java   | 201 ++++++++++++++++
 .../CandidateResultsEntityResultsCollector.java | 217 -----------------
 .../CandidateResultsIdVerifyFilter.java         | 193 ---------------
 .../impl/CollectionRefsVerifier.java            |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  65 ------
 .../impl/ConnectionRefsVerifier.java            |  59 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  73 ------
 .../impl/ElasticSearchQueryExecutor.java        | 224 ------------------
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
 .../elasticsearch/impl/FilteringLoader.java     | 219 -----------------
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ----
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ----
 .../impl/ResultsLoaderFactory.java              |  41 ----
 .../elasticsearch/impl/ResultsVerifier.java     |  52 -----
 .../elasticsearch/impl/VersionVerifier.java     |  85 -------
 .../pipeline/read/entity/EntityIdFilter.java    |  53 -----
 .../read/entity/EntityLoadCollector.java        |  94 --------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  12 +-
 .../read/graph/AbstractReadGraphFilter.java     |  15 +-
 .../pipeline/read/graph/EntityIdFilter.java     |  54 +++++
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ++++++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |  20 +-
 .../results/ObservableQueryExecutor.java        |  24 +-
 .../pipeline/cursor/CursorTest.java             |  20 +-
 .../persistence/index/CandidateResults.java     |  11 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   7 +-
 52 files changed, 1402 insertions(+), 2096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3119934..2790ee1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,7 +32,6 @@ import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
@@ -648,7 +647,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }
@@ -917,7 +916,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index bc93b6c..df6a218 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
@@ -47,7 +46,6 @@ public class Pipeline<R> {
     private final List<PipelineOperation> idPipelineOperationList;
     private final Collector<?, R> collector;
     private final RequestCursor requestCursor;
-    private final ResponseCursor responseCursor;
 
     private final int limit;
 
@@ -69,7 +67,6 @@ public class Pipeline<R> {
         this.limit = limit;
 
         this.requestCursor = new RequestCursor( cursor );
-        this.responseCursor = new ResponseCursor();
     }
 
 
@@ -77,7 +74,7 @@ public class Pipeline<R> {
      * Execute the pipline construction, returning an observable of results
      * @return
      */
-    public Observable<PipelineResult<R>> execute(){
+    public Observable<R> execute(){
 
 
         Observable traverseObservable = Observable.just( applicationScope.getApplication() );
@@ -99,7 +96,7 @@ public class Pipeline<R> {
 
 
         //append the optional cursor into the response for the caller to use
-        return response.map( result -> new PipelineResult<>( result, responseCursor ) );
+        return response;
     }
 
 
@@ -111,7 +108,7 @@ public class Pipeline<R> {
     private void setState( final PipelineOperation pipelineOperation ) {
 
 
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor,
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor,
             limit, idCount );
 
         pipelineOperation.setContext( context );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 325f876..018abb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -38,16 +38,13 @@ public class PipelineContext {
     private final int id;
     private final ApplicationScope applicationScope;
     private final RequestCursor requestCursor;
-    private final ResponseCursor responseCursor;
     private final int limit;
 
 
-    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor,
-                            final ResponseCursor responseCursor, final int limit, final int id ) {
+    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
 
         this.applicationScope = applicationScope;
         this.requestCursor = requestCursor;
-        this.responseCursor = responseCursor;
         this.limit = limit;
         this.id = id;
     }
@@ -64,7 +61,7 @@ public class PipelineContext {
 
 
     /**
-     * Get our cursor value if present
+     * Get our cursor value if present from our pipline
      * @param serializer
      */
     public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) {
@@ -73,15 +70,6 @@ public class PipelineContext {
         return Optional.fromNullable( value );
     }
 
-
-    /**
-     * Set the cursor value into our resposne
-     */
-    public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) {
-        responseCursor.setCursor( id, value, serializer );
-    }
-
-
     /**
      * Get the limit for this execution
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
new file mode 100644
index 0000000..d2fa16c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+
+import rx.Observable;
+
+
+/**
+ * Interface for filtering commands.  All filters must take an observable of Id's as an input.  Output is then determined by subclasses.
+  * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
+  * pipeline
+ * @param <T> The input type of the filter value
+ * @param <R> The output type of the filter value
+ */
+public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+
+    void setContext(final PipelineContext pipelineContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
deleted file mode 100644
index fe8604e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Intermediate observable that will return results, as well as an optional cursor
- * @param <R>
- */
-public class PipelineResult<R> {
-
-
-    private final R result;
-
-    private final ResponseCursor responseCursor;
-
-
-    public PipelineResult( final R result, final ResponseCursor responseCursor ) {
-        this.result = result;
-        this.responseCursor = responseCursor;
-    }
-
-
-    /**
-     * If the user requests our cursor, return the cursor
-     * @return
-     */
-    public Optional<String> getCursor(){
-        return this.responseCursor.encodeAsString();
-    }
-
-    public R getResult(){
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index f1c8c24..dbd8b88 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,12 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.io.Serializable;
 import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
 
-import com.fasterxml.jackson.core.Base64Variant;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,71 +39,72 @@ public class ResponseCursor {
 
     private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper();
 
+
     /**
-     * We use a map b/c some indexes might be skipped
+     * The pointer to the first edge path.  Evaluation is lazily performed in the case the caller does not care about
+     * the cursor.
      */
-    private Map<Integer, CursorEntry<?>> cursors = new HashMap<>();
+    private final Optional<EdgePath> edgePath;
 
+    private Optional<String> encodedValue = null;
 
-    /**
-     * Set the possible cursor value into the index. DOES NOT parse the cursor.  This is intentional for performance
-     */
-    public <T extends Serializable> void setCursor( final int id, final T cursor,
-                                                    final CursorSerializer<T> serializer ) {
 
-        final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer );
-        cursors.put( id, newEntry );
-    }
+    public ResponseCursor( final Optional<EdgePath> edgePath ) {this.edgePath = edgePath;}
 
 
     /**
-     * now we're done, encode as a string
+     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using streams, we dont' want to take the
+     * time to calculate it
      */
     public Optional<String> encodeAsString() {
-        try {
 
-            if(cursors.isEmpty()){
-                return Optional.absent();
-            }
+        //always return cached if we are called 2x
+        if ( encodedValue != null ) {
+            return encodedValue;
+        }
+
+        if ( !edgePath.isPresent() ) {
+            encodedValue = Optional.absent();
+            return encodedValue;
+        }
+
+
+        try {
 
+            //no edge path, short circuit
 
             final ObjectNode map = MAPPER.createObjectNode();
 
-            for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) {
 
-                final CursorEntry cursorEntry = entry.getValue();
+            Optional<EdgePath> current = edgePath;
 
-                final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor );
 
-                map.put( entry.getKey().toString(), serialized );
-            }
+            //traverse each edge and add them to our json
+            do {
+
+                final EdgePath edgePath = current.get();
+                final Object cursorValue = edgePath.getCursorValue();
+                final CursorSerializer serializer = edgePath.getSerializer();
+                final int filterId = edgePath.getFilterId();
+
+                final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+                map.put( String.valueOf( filterId ), serialized );
 
+                current = current.get().getPrevious();
+            }
+            while ( current.isPresent() );
 
-            final byte[] output = MAPPER.writeValueAsBytes(map);
+            final byte[] output = MAPPER.writeValueAsBytes( map );
 
             //generate a base64 url save string
             final String value = Base64.getUrlEncoder().encodeToString( output );
 
-            return Optional.of( value );
-
+            encodedValue =  Optional.of( value );
         }
         catch ( JsonProcessingException e ) {
             throw new CursorParseException( "Unable to serialize cursor", e );
         }
-    }
 
-
-    /**
-     * Interal pointer to the cursor and it's serialzed value
-     */
-    private static final class CursorEntry<T> {
-        private final T cursor;
-        private final CursorSerializer<T> serializer;
-
-
-        private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) {
-            this.cursor = cursor;
-            this.serializer = serializer;
-        }
+        return encodedValue;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
new file mode 100644
index 0000000..e4d5d44
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
new file mode 100644
index 0000000..c68dc4a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Abstract class for filters to extend that require a cursor
+ * @param <T> The input type
+ * @param <R> The response type
+ * @param <C> The cursor type
+ */
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+
+
+
+    //TODO not a big fan of this, but not sure how to build resume otherwise
+    private CursorSeek<C> cursorSeek;
+
+
+    /**
+     * Return the parsed value of the cursor from the last request, if it exists
+     */
+    protected Optional<C> getSeekValue() {
+
+        if(cursorSeek == null) {
+            final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
+            cursorSeek = new CursorSeek<>( cursor );
+        }
+
+        return cursorSeek.getSeekValue();
+
+    }
+
+
+    /**
+     * Sets the cursor into our pipeline context
+     */
+    protected FilterResult<R> createFilterResult( final R emit, final C cursorValue, final Optional<EdgePath> parent ){
+
+
+        //create a current path, and append our parent path to it
+        final EdgePath<C> newEdgePath =
+            new EdgePath<>( pipelineContext.getId(), cursorValue, getCursorSerializer(), parent );
+
+        //emit our value with the parent path
+        return new FilterResult<>( emit, Optional.of( newEdgePath ) );
+
+    }
+
+
+    /**
+     * Return the class to be used when parsing the cursor
+     */
+    protected abstract CursorSerializer<C> getCursorSerializer();
+
+
+    /**
+     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
+     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
+     */
+    protected static class CursorSeek<C> {
+
+        private Optional<C> seek;
+
+        private CursorSeek(final Optional<C> cursorValue){
+            seek = cursorValue;
+        }
+
+
+        /**
+         * Get the seek value to use when searching
+         * @return
+         */
+        public Optional<C> getSeekValue(){
+            final Optional<C> toReturn = seek;
+
+            seek = Optional.absent();
+
+            return toReturn;
+        }
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
deleted file mode 100644
index 8d7f106..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> {
-
-
-    protected PipelineContext pipelineContext;
-
-
-    @Override
-    public void setContext( final PipelineContext pipelineContext ) {
-        this.pipelineContext = pipelineContext;
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
deleted file mode 100644
index c23a1b7..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Abstract class for filters to extend that require a cursor
- * @param <T> The input type
- * @param <R> The response type
- * @param <C> The cursor type
- */
-public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> {
-
-
-
-    //TODO not a big fan of this, but not sure how to build resume otherwise
-    private CursorSeek<C> cursorSeek;
-
-
-    /**
-     * Return the parsed value of the cursor from the last request, if it exists
-     */
-    protected Optional<C> getSeekValue() {
-
-        if(cursorSeek == null) {
-            final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
-            cursorSeek = new CursorSeek<>( cursor );
-        }
-
-        return cursorSeek.getSeekValue();
-
-    }
-
-
-    /**
-     * Sets the cursor into our pipeline context
-     * @param newValue
-     */
-    protected void setCursor(final C newValue){
-        pipelineContext.setCursorValue( newValue, getCursorSerializer() );
-    }
-
-
-    /**
-     * Return the class to be used when parsing the cursor
-     */
-    protected abstract CursorSerializer<C> getCursorSerializer();
-
-
-    /**
-     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
-     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
-     */
-    protected static class CursorSeek<C> {
-
-        private Optional<C> seek;
-
-        private CursorSeek(final Optional<C> cursorValue){
-            seek = cursorValue;
-        }
-
-
-        /**
-         * Get the seek value to use when searching
-         * @return
-         */
-        public Optional<C> getSeekValue(){
-            final Optional<C> toReturn = seek;
-
-            seek = Optional.absent();
-
-            return toReturn;
-        }
-
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
deleted file mode 100644
index 4e6d06e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of ids, and emits
- * an observable of ids
- */
-public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
index 69d929c..e28ce44 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
@@ -20,11 +20,18 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
 /**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
+ * A command that is used to reduce our stream of results into a stream of final batch outputs.  When used
+ * no further transformation or encoding should occur.  Otherwise EdgePath data will be lost, and serialization cannot occur
+ * across requests
+ *
+ * @param <T>  The input type
+ * @param <R> The output type
  */
-public interface Collector<T, R> extends PipelineOperation<T, R> {
+public interface Collector<T, R> extends PipelineOperation<T,R> {
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
index 6893b34..dd200b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
@@ -20,8 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 
 
 /**
@@ -29,16 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollec
  */
 public interface CollectorFactory {
 
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    EntityLoadCollector entityLoadCollector();
 
     /**
-     * Get the collector for collection candidate results to entities
+     * Get the results page collector
      * @return
      */
-    CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector();
-
+   ResultsPageCollector getResultsPageCollector();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
new file mode 100644
index 0000000..c560fad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A path from our input element to our emitted element.  A list of EdgePaths comprise a path through the graph.  The chains of edge paths will result
+ * in a cursor when aggregated.  If a graph traversal is the following
+ *
+ * applicationId(1) - "users" -> userId(2) - "devices" -> deviceId(3).  There would be 2 EdgePath
+ *
+ *  EdgePath("users"->userId(2)) <- parent - EdgePath("devices" -> deviceId(3))
+ */
+public class EdgePath<C> {
+
+
+    private final int filterId;
+    private final C cursorValue;
+    private final CursorSerializer<C> serializer;
+    private final Optional<EdgePath> previous;
+
+
+    /**
+     *
+     * @param filterId The id of the filter that generated this path
+     * @param cursorValue The value to resume seeking on the path
+     * @param serializer The serializer to serialize the value
+     * @param parent The parent graph path edge to reach this path
+     */
+    public EdgePath( final int filterId, final C cursorValue, final CursorSerializer<C> serializer,
+                     final Optional<EdgePath> parent ) {
+        this.filterId = filterId;
+        this.cursorValue = cursorValue;
+        this.serializer = serializer;
+        this.previous = parent;
+    }
+
+
+    public C getCursorValue() {
+        return cursorValue;
+    }
+
+
+    public int getFilterId() {
+        return filterId;
+    }
+
+
+    public Optional<EdgePath> getPrevious() {
+        return previous;
+    }
+
+
+    public CursorSerializer<C> getSerializer() {
+        return serializer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index ace62db..054a85a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -20,11 +20,12 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 
 
 /**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of ids, and emits
- * an observable of ids
+ * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of FilterResult, and emits
+ * an observable of FilterResults.  Filters should never emit groups or objects that represent collections.  Items should
+ * always be emitted 1 at a time.  It is the responsibility of the collector to aggregate results.
  */
-public interface Filter<T, R> extends PipelineOperation<T, R> {}
+public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 078d981..c465516 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,10 +20,12 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
@@ -43,6 +45,7 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param collectionName The collection name to use when reading the graph
      */
     ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName );
@@ -57,12 +60,14 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param connectionName The connection name to use when traversing the graph
      */
     ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param connectionName The connection name to use when traversing the graph
      * @param entityType The entity type to use when traversing the graph
      */
@@ -72,13 +77,15 @@ public interface FilterFactory {
 
     /**
      * Read a connection directly between two identifiers
+     *
      * @param connectionName The connection name to use when traversing the graph
-     * @param targetId  The target Id to use when traversing the graph
+     * @param targetId The target Id to use when traversing the graph
      */
     ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId );
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param query The query to use when querying the entities in the collection
      * @param collectionName The collection name to use when querying
      */
@@ -90,6 +97,7 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param query The query to use when querying the entities in the connection
      * @param connectionName The type of connection to query
      * @param connectedEntityType The type of entity in the connection.  Leave absent to query all entity types
@@ -102,13 +110,24 @@ public interface FilterFactory {
 
 
     /**
-     * Get a candidate ids verifier for collection results.  Should be inserted into pipelines where a query filter is an intermediate step,
-     * not a final filter before collectors
+     * Generate a new instance of the command with the specified parameters
+     */
+    EntityLoadFilter entityLoadFilter();
+
+    /**
+     * Get the collector for collection candidate results to entities
      */
-    CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter();
+    CandidateEntityFilter candidateEntityFilter();
+
+    /**
+     * Get a candidate ids verifier for collection results.  Should be inserted into pipelines where a query filter is
+     * an intermediate step, not a final filter before collectors
+     */
+    CandidateIdFilter candidateResultsIdVerifyFilter();
 
     /**
      * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not doing full traversals
+     *
      * @param entityId The entity id to emit
      */
     EntityIdFilter getEntityIdFilter( final Id entityId );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
new file mode 100644
index 0000000..3c41a2b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A bean that is passed between filters with immutable cursor state
+ * @param <T>
+ */
+public class FilterResult<T> {
+    private final T value;
+    private final Optional<EdgePath> path;
+
+
+    /**
+     * Create a new immutable filtervalue
+     * @param value The value the filter emits
+     * @param path The path to this value, if created
+     */
+    public FilterResult( final T value, final Optional<EdgePath> path ) {
+        this.value = value;
+        this.path = path;
+    }
+
+
+    public T getValue() {
+        return value;
+    }
+
+
+    public Optional<EdgePath> getPath() {
+        return path;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
deleted file mode 100644
index 28bba36..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-import rx.Observable;
-
-
-/**
- * Interface for filtering commands.  All filters must take an observable of Id's as an input.  Output is then determined by subclasses.
-  * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
-  * pipeline
- * @param <T> The input type
- * @param <R>
- */
-public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> {
-
-    void setContext(final PipelineContext pipelineContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 25ab03e..d0e87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,9 +20,6 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -103,5 +100,5 @@ public interface ReadPipelineBuilder {
      * Load our entity results when our previous filter calls graph
      * @return
      */
-    Observable<PipelineResult<ResultsPage>> execute();
+    Observable<ResultsPage> execute();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 4ecfb47..ffb9f7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
+import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,6 +52,8 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
     private final ApplicationScope applicationScope;
 
+    private final CollectorFactory collectorFactory;
+
 
     /**
      * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are
@@ -70,6 +72,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         this.filterFactory = filterFactory;
 
         this.applicationScope = applicationScope;
+        this.collectorFactory = collectorFactory;
 
         //init our cursor to empty
         this.cursor = Optional.absent();
@@ -78,7 +81,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         this.limit = DEFAULT_LIMIT;
 
 
-        this.collectorState = new CollectorState( collectorFactory );
+        this.collectorState = new CollectorState( );
 
         this.filters = new ArrayList<>();
     }
@@ -120,7 +123,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
 
-        this.collectorState.setEntityLoaderCollector();
+        this.collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -132,7 +135,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphCollectionFilter( collectionName ) );
 
-        this.collectorState.setEntityLoaderCollector();
+        this.collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -147,7 +150,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) );
 
-        this.collectorState.setCandidateResultsEntityResultsCollector();
+        this.collectorState.setCandidateEntityFilter();
 
         return this;
     }
@@ -159,7 +162,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         ValidationUtils.verifyIdentity( entityId );
 
         filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -169,7 +172,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
     public ReadPipelineBuilder getConnection( final String connectionName ) {
         Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
         filters.add( filterFactory.readGraphConnectionFilter( connectionName ) );
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -182,7 +185,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) );
 
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
         return this;
     }
 
@@ -196,17 +199,25 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         Preconditions.checkNotNull( query, "query must not be null" );
 
         filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) );
-        collectorState.setCandidateResultsEntityResultsCollector();
+        collectorState.setCandidateEntityFilter();
         return this;
     }
 
 
     @Override
-    public Observable<PipelineResult<ResultsPage>> execute() {
+    public Observable<ResultsPage> execute() {
 
         ValidationUtils.validateApplicationScope( applicationScope );
 
-        final Collector<?, ResultsPage> collector = collectorState.getCollector();
+
+        //add our last filter that will generate entities
+        final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+
+        filters.add( finalFilter );
+
+
+        //execute our collector
+        final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector();
 
         Preconditions.checkNotNull( collector,
             "You have not specified an operation that creates a collection filter.  This is required for loading "
@@ -229,46 +240,52 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
      * A mutable state for our collectors.  Rather than create a new instance each time, we create a singleton
      * collector
      */
-    private static final class CollectorState {
-        private final CollectorFactory collectorFactory;
+    private final class CollectorState {
+
 
-        private EntityLoadCollector entityLoadCollector;
+        private EntityLoadFilter entityLoadCollector;
 
-        private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector;
+        private CandidateEntityFilter candidateEntityFilter;
 
+        private Filter entityLoadFilter;
 
-        private Collector<?, ResultsPage> collector = null;
 
 
-        private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;}
+        private CollectorState( ){}
 
 
-        public void setEntityLoaderCollector() {
+        /**
+         * Set our final filter to be a load entity by Id filter
+         */
+        public void setIdEntityLoaderFilter() {
             if ( entityLoadCollector == null ) {
-                entityLoadCollector = collectorFactory.entityLoadCollector();
+                entityLoadCollector = filterFactory.entityLoadFilter();
             }
 
 
-            collector = entityLoadCollector;
+            entityLoadFilter = entityLoadCollector;
         }
 
 
-        public void setCandidateResultsEntityResultsCollector() {
-            if ( candidateResultsEntityResultsCollector == null ) {
-                candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector();
+        /**
+         * Set our final filter to be a load entity by candidate filter
+         */
+        public void setCandidateEntityFilter() {
+            if ( candidateEntityFilter == null ) {
+                candidateEntityFilter = filterFactory.candidateEntityFilter();
             }
 
-            collector = candidateResultsEntityResultsCollector;
+            entityLoadFilter = candidateEntityFilter;
         }
 
 
         public void clear() {
-            collector = null;
+            entityLoadFilter = null;
         }
 
 
-        public Collector<?, ResultsPage> getCollector() {
-            return collector;
+        public Filter<?, Entity> getFinalFilter() {
+            return entityLoadFilter;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 198ac67..1810d65 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -22,18 +22,28 @@ package org.apache.usergrid.corepersistence.pipeline.read;
 
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 
 /**
- * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be considered a "page" of results.
+ * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be
+ * considered a "page" of results.  A hold over from 1.0.  We shouldn't need this when we fully move away from the EM/RM
  */
 public class ResultsPage {
 
     private final List<Entity> entityList;
 
+    private final int limit;
 
-    public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;}
+    private final ResponseCursor responseCursor;
+
+
+    public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+        this.entityList = entityList;
+        this.responseCursor = responseCursor;
+        this.limit = limit;
+    }
 
 
     public List<Entity> getEntityList() {
@@ -43,9 +53,15 @@ public class ResultsPage {
 
     /**
      * Return true if the results page is empty
-     * @return
      */
-    public boolean isEmpty(){
-        return entityList == null || entityList.isEmpty();
+    public boolean hasMoreResults() {
+        return entityList != null && entityList.size() == limit;
+    }
+
+
+
+
+    public ResponseCursor getResponseCursor() {
+        return responseCursor;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
new file mode 100644
index 0000000..1c5175d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractCollector<T, R> implements Collector<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
new file mode 100644
index 0000000..84654aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Takes entities and collects them into results.  This mostly exists for 1.0 compatibility.  Eventually this will
+ * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ */
+public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
+    implements Collector<Entity, ResultsPage> {
+
+
+    @Override
+    public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+        final int limit = pipelineContext.getLimit();
+
+        return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
+            () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
+                collector.add( entity );
+            } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+            new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+    }
+
+
+    /**
+     * A collector that will aggregate our results together
+     */
+    private static class ResultsPageWithCursorCollector {
+
+
+        private final List<Entity> results;
+
+        private Optional<EdgePath> lastPath;
+
+
+        private ResultsPageWithCursorCollector( final int limit ) {
+            this.results = new ArrayList<>( limit );
+        }
+
+
+        public void add( final FilterResult<Entity> result ) {
+            this.results.add( result.getValue() );
+            this.lastPath = result.getPath();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index eac8a65..004a696 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -24,11 +24,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
 import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
@@ -44,8 +46,8 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer>
-    implements CandidateResultsFilter {
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
+    implements Filter<Id, Candidate> {
 
     private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
 
@@ -66,7 +68,7 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
 
     @Override
-    public Observable<CandidateResults> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
 
         //get the graph manager
         final ApplicationEntityIndex applicationEntityIndex =
@@ -80,12 +82,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return observable.flatMap( idFilterResult -> {
 
-            final SearchEdge searchEdge = getSearchEdge( id );
+            final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
 
 
-            final Observable<CandidateResults> candidates = Observable.create( subscriber -> {
+            final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
 
                 //our offset to our start value.  This will be set the first time we emit
                 //after we receive new ids, we want to reset this to 0
@@ -98,19 +100,14 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
                 subscriber.onStart();
 
-                //emit while we have values from ES
-                while ( true ) {
+                //emit while we have values from ES and someone is subscribed
+                while ( !subscriber.isUnsubscribed() ) {
 
 
                     try {
                         final CandidateResults candidateResults =
                             applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
 
-                        currentOffSet += candidateResults.size();
-
-                        //set the cursor for the next value
-                        setCursor( currentOffSet );
-
                         /**
                          * No candidates, we're done
                          */
@@ -119,7 +116,25 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
                             return;
                         }
 
-                        subscriber.onNext( candidateResults );
+
+                        for( CandidateResult candidateResult: candidateResults){
+
+                            //our subscriber unsubscribed, break out
+                            if(subscriber.isUnsubscribed()){
+                              return;
+                            }
+
+                            final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+                            final FilterResult<Candidate>
+                                result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+                            subscriber.onNext( result );
+
+                            currentOffSet++;
+                        }
+
+
                     }
                     catch ( Throwable t ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
new file mode 100644
index 0000000..ab9d5d9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+    private final CandidateResult candidateResult;
+    private final SearchEdge searchEdge;
+
+
+    /**
+     * Create a new Candidate for further processing
+     * @param candidateResult  The candidate result
+     * @param searchEdge The search edge this was searched on
+     */
+    public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+        this.candidateResult = candidateResult;
+        this.searchEdge = searchEdge;
+    }
+
+
+    public CandidateResult getCandidateResult() {
+        return candidateResult;
+    }
+
+
+    public SearchEdge getSearchEdge() {
+        return searchEdge;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
new file mode 100644
index 0000000..d30917c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency.  Note that all entities may not be emitted if our load crosses page boundaries.  It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
+    implements Filter<Candidate, Entity> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                  final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+    @Override
+       public Observable<FilterResult<Entity>> call(
+           final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        //buffer them to get a page size we can make 1 network hop
+        final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+            //load them
+            .flatMap( candidateResults -> {
+                    //flatten toa list of ids to load
+                    final Observable<List<Id>> candidateIds =
+                        Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+                    //load the ids
+                    final Observable<EntitySet> entitySetObservable =
+                        candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map(
+                        entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+                                              .flatMap(
+                                                  entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        //if we filter all our results, we want to continue to try the next page
+        return searchIdSetObservable;
+    }
+
+
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+                               final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        public EntityIndexBatch getBatch() {
+            return batch;
+        }
+
+
+        private void validate( final FilterResult<Candidate> filterResult ) {
+
+            final Candidate candidate = filterResult.getValue();
+            final CandidateResult candidateResult = candidate.getCandidateResult();
+            final SearchEdge searchEdge = candidate.getSearchEdge();
+            final Id candidateId = candidateResult.getId();
+            final UUID candidateVersion = candidateResult.getVersion();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null ) {
+                logger.warn(
+                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue",
+                    candidateId, candidateVersion );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+
+            }
+
+
+            final UUID entityVersion = entity.getVersion();
+            final Id entityId = entity.getId();
+
+
+
+
+
+            //entity is newer than ES version, could be an update or the entity is marked as deleted
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //they're the same add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}


[11/12] incubator-usergrid git commit: Merge branch 'USERGRID-587' into two-dot-o-dev

Posted by gr...@apache.org.
Merge branch 'USERGRID-587' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b939d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b939d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b939d15

Branch: refs/heads/USERGRID-609
Commit: 9b939d15ba398c1aa759f17d647bd85ee232ad9e
Parents: 4107ef3 5bb7798
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 11:27:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 11:27:46 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   5 +-
 .../corepersistence/pipeline/Pipeline.java      |  12 +-
 .../pipeline/PipelineContext.java               |  16 +-
 .../pipeline/PipelineOperation.java             |  39 ++++
 .../pipeline/PipelineResult.java                |  57 -----
 .../pipeline/cursor/ResponseCursor.java         |  81 ++++---
 .../pipeline/read/AbstractFilter.java           |  45 ++++
 .../pipeline/read/AbstractPathFilter.java       | 109 +++++++++
 .../read/AbstractPipelineOperation.java         |  44 ----
 .../pipeline/read/AbstractSeekingFilter.java    | 102 --------
 .../pipeline/read/CandidateResultsFilter.java   |  31 ---
 .../pipeline/read/Collector.java                |  13 +-
 .../pipeline/read/CollectorFactory.java         |  12 +-
 .../corepersistence/pipeline/read/EdgePath.java |  79 +++++++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |  40 +++-
 .../pipeline/read/FilterResult.java             |  56 +++++
 .../pipeline/read/PipelineOperation.java        |  38 ---
 .../pipeline/read/ReadPipelineBuilder.java      |   5 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  80 ++++---
 .../pipeline/read/ResultsPage.java              |  26 ++-
 .../read/collect/AbstractCollector.java         |  46 ++++
 .../pipeline/read/collect/EntityFilter.java     |  68 ++++++
 .../read/collect/IdCursorSerializer.java        |  41 ++++
 .../read/collect/ResultsPageCollector.java      |  80 +++++++
 .../AbstractElasticSearchFilter.java            |  45 ++--
 .../pipeline/read/elasticsearch/Candidate.java  |  55 +++++
 .../elasticsearch/CandidateEntityFilter.java    | 234 +++++++++++++++++++
 .../read/elasticsearch/CandidateIdFilter.java   | 201 ++++++++++++++++
 .../CandidateResultsEntityResultsCollector.java | 217 -----------------
 .../CandidateResultsIdVerifyFilter.java         | 193 ---------------
 .../impl/CollectionRefsVerifier.java            |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  65 ------
 .../impl/ConnectionRefsVerifier.java            |  59 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  73 ------
 .../impl/ElasticSearchQueryExecutor.java        | 224 ------------------
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
 .../elasticsearch/impl/FilteringLoader.java     | 219 -----------------
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ----
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ----
 .../impl/ResultsLoaderFactory.java              |  41 ----
 .../elasticsearch/impl/ResultsVerifier.java     |  52 -----
 .../elasticsearch/impl/VersionVerifier.java     |  85 -------
 .../pipeline/read/entity/EntityIdFilter.java    |  53 -----
 .../read/entity/EntityLoadCollector.java        |  94 --------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  12 +-
 .../read/graph/AbstractReadGraphFilter.java     |  65 +++++-
 .../pipeline/read/graph/EntityIdFilter.java     |  54 +++++
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ++++++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |  20 +-
 .../results/ObservableQueryExecutor.java        |  24 +-
 .../apache/usergrid/persistence/Results.java    |   2 +-
 .../pipeline/cursor/CursorTest.java             |  20 +-
 .../persistence/index/CandidateResults.java     |  11 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   7 +-
 .../persistence/index/usergrid-mappings.json    |   1 -
 56 files changed, 1575 insertions(+), 2100 deletions(-)
----------------------------------------------------------------------



[08/12] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-587


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a673c81b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a673c81b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a673c81b

Branch: refs/heads/USERGRID-609
Commit: a673c81bf7c51ce48f2b8d052fcba990931e1977
Parents: 118711a d18b8ee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:59:09 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:59:09 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EntityIndexTest.java | 121 ++++++++++---------
 1 file changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------



[06/12] incubator-usergrid git commit: Fixes resume logic by loading then filtering first id

Posted by gr...@apache.org.
Fixes resume logic by loading then filtering first id


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/413f023e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/413f023e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/413f023e

Branch: refs/heads/USERGRID-609
Commit: 413f023e243d8e398db6295b05a9c1f6c8c8feed
Parents: 294a7d9
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:11:01 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:11:01 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/pipeline/Pipeline.java      |  3 +-
 .../pipeline/read/FilterFactory.java            |  9 +++
 .../pipeline/read/ReadPipelineBuilderImpl.java  | 11 +++-
 .../pipeline/read/collect/EntityFilter.java     | 68 ++++++++++++++++++++
 .../read/collect/IdCursorSerializer.java        | 41 ++++++++++++
 .../persistence/index/usergrid-mappings.json    |  1 -
 6 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index df6a218..26cf346 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
@@ -77,7 +78,7 @@ public class Pipeline<R> {
     public Observable<R> execute(){
 
 
-        Observable traverseObservable = Observable.just( applicationScope.getApplication() );
+        Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() ));
 
         //build our traversal commands
         for ( PipelineOperation pipelineOperation : idPipelineOperationList ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index c465516..a2f1605 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
@@ -131,4 +133,11 @@ public interface FilterFactory {
      * @param entityId The entity id to emit
      */
     EntityIdFilter getEntityIdFilter( final Id entityId );
+
+
+    /**
+     * Create a new instance of our entity filter
+     * @return
+     */
+    EntityFilter entityFilter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index ffb9f7d..28446ad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -26,9 +26,9 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
-import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -211,9 +211,14 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
 
         //add our last filter that will generate entities
-        final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+        final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter();
 
-        filters.add( finalFilter );
+        filters.add( entityLoadFilter );
+
+        //add the filter that skips the first result on resume
+        final Filter<Entity, Entity>  cursorEntityFilter = filterFactory.entityFilter();
+
+        filters.add( cursorEntityFilter );
 
 
         //execute our collector

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
new file mode 100644
index 0000000..daf2e7f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor.  This will filter the first result, only if
+ * it matches the Id that was set
+ */
+public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+        //filter only the first id, then map into our path for our next pass
+
+
+        return filterResultObservable.skipWhile( filterResult -> {
+
+            final Optional<Id> startFromCursor = getSeekValue();
+
+            return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() );
+        } ).map( filterResult -> {
+
+
+            final Entity entity = filterResult.getValue();
+            final Id entityId = entity.getId();
+
+            return createFilterResult( entity, entityId, filterResult.getPath() );
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Id> getCursorSerializer() {
+        return IdCursorSerializer.INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
new file mode 100644
index 0000000..d96b9f2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdCursorSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * cursor serializer for Ids
+ */
+public class IdCursorSerializer extends AbstractCursorSerializer<Id> {
+
+
+    public static final IdCursorSerializer INSTANCE = new IdCursorSerializer();
+
+    @Override
+    protected Class<Id> getType() {
+        return Id.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/413f023e/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index bee84a2..c22a4ec 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -73,7 +73,6 @@
                     },
                     "string": {
                         "type": "string",
-                        "doc_values": true,
                         "norms": {
                             "enabled": false
                         },


[02/12] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.

Posted by gr...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
new file mode 100644
index 0000000..d598a2e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+    @Inject
+    public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+        //it's more efficient to make 1 network hop to get everything, then drop our results if required
+        final Observable<FilterResult<Entity>> entityObservable =
+            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+                    final Observable<EntitySet> entitySetObservable =
+                        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+                                  .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+                                              .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                            entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        return entityObservable;
+    }
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this
+     * state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final List<FilterResult<Id>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Id> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        private void validate( final FilterResult<Id> filterResult ) {
+
+            final Id candidateId = filterResult.getValue();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null || !entity.getEntity().isPresent() ) {
+                logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue", candidateId );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //it exists, add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 12306fd..7371579 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
 /**
  * Command for reading graph edges on a connection
  */
-public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
     private final String connectionName;
@@ -61,8 +62,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
     }
 
 
+
     @Override
-    public Observable<Id> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
 
         //get the graph manager
         final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
@@ -73,20 +75,18 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return filterResultObservable.flatMap( idFilterResult -> {
 
               //set our our constant state
             final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = idFilterResult.getValue();
 
             final SimpleSearchByIdType search =
                 new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                     entityType, startFromCursor );
 
-            /**
-             * TODO, pass a message with pointers to our cursor values to be generated later
-             */
-            return graphManager.loadEdgesFromSourceByType( search ).doOnNext( edge -> setCursor( edge ) ).map(
-                edge -> edge.getTargetNode() );
+            return graphManager.loadEdgesFromSourceByType( search ).map(
+                edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
         } );
     }
 
@@ -95,4 +95,6 @@ public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, I
     protected CursorSerializer<Edge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index f3c2a9c..0260d1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -26,12 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -51,13 +49,17 @@ public class ObservableQueryExecutor implements QueryExecutor {
     public Iterator<Results> iterator;
 
 
-    public ObservableQueryExecutor( final Observable<PipelineResult<ResultsPage>> resultsObservable ) {
+    public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
        //map to our old results objects, return a default empty if required
         this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
     }
 
 
-
+    /**
+     *
+     * @param cpEntity
+     * @return
+     */
     private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
 
 
@@ -72,9 +74,8 @@ public class ObservableQueryExecutor implements QueryExecutor {
         return entity;
     }
 
-    private Results createResults( final PipelineResult<ResultsPage> pipelineResults ){
+    private Results createResults( final ResultsPage resultsPage ){
 
-        final ResultsPage resultsPage = pipelineResults.getResult();
         final List<Entity> entityList = resultsPage.getEntityList();
         final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
 
@@ -85,10 +86,15 @@ public class ObservableQueryExecutor implements QueryExecutor {
 
         final Results results = Results.fromEntities( resultsEntities );
 
-        if(pipelineResults.getCursor().isPresent()) {
-            results.setCursor( pipelineResults.getCursor().get() );
-        }
 
+        //add the cursor if our limit is the same
+        if(resultsPage.hasMoreResults()) {
+            final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
+
+            if ( cursor.isPresent() ) {
+                results.setCursor( cursor.get() );
+            }
+        }
         return results;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index 6e15d54..fd65ebf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -24,11 +24,11 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 import org.junit.Test;
 
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
 
 import com.google.common.base.Optional;
 
@@ -41,7 +41,10 @@ public class CursorTest {
     @Test
     public void testCursors(){
 
-        ResponseCursor responseCursor = new ResponseCursor();
+
+
+
+
 
 
         //test encoding edge
@@ -58,13 +61,18 @@ public class CursorTest {
         final Integer query2 = 20;
 
 
-        responseCursor.setCursor( 0, edge1, EdgeCursorSerializer.INSTANCE );
 
-        responseCursor.setCursor( 1, query1, ElasticsearchCursorSerializer.INSTANCE );
+        final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() );
+
+        final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ));
+
+        final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) );
+
+        final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) );
+
 
-        responseCursor.setCursor(2, edge2, EdgeCursorSerializer.INSTANCE);
 
-        responseCursor.setCursor(3, query2, ElasticsearchCursorSerializer.INSTANCE);
+        ResponseCursor responseCursor = new ResponseCursor( Optional.of(filter0Path) );
 
         final Optional<String> cursor = responseCursor.encodeAsString();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index 1e63df1..a157e47 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -42,14 +42,10 @@ public class CandidateResults implements Iterable<CandidateResult> {
 
     private final List<CandidateResult> candidates;
     private final Collection<SelectFieldMapping> getFieldMappings;
-    private final SearchEdge searchEdge;
 
-
-    public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings,
-                             final SearchEdge searchEdge ) {
+    public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
         this.candidates = candidates;
         this.getFieldMappings = getFieldMappings;
-        this.searchEdge = searchEdge;
         offset = Optional.absent();
     }
 
@@ -91,11 +87,6 @@ public class CandidateResults implements Iterable<CandidateResult> {
     }
 
 
-    public SearchEdge getSearchEdge() {
-        return searchEdge;
-    }
-
-
     /**
      * Get the candidates
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2d4696a..5b67060 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -169,7 +169,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         }
         failureMonitor.success();
 
-        return parseResults(searchResponse, parsedQuery, searchEdge, limit, offset);
+        return parseResults(searchResponse, parsedQuery, limit, offset);
     }
 
 
@@ -227,7 +227,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
     /**
      * Parse the results and return the canddiate results
      */
-    private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query, final SearchEdge searchEdge,
+    private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
                                            final int limit, final int from ) {
 
         final SearchHits searchHits = searchResponse.getHits();
@@ -244,8 +244,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
             candidates.add( candidateResult );
         }
 
-        final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings(),
-            searchEdge );
+        final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
 
         // >= seems odd.  However if we get an overflow, we need to account for it.
         if (  hits.length >= limit ) {


[07/12] incubator-usergrid git commit: Fixes graph cursor resume state.

Posted by gr...@apache.org.
Fixes graph cursor resume state.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/118711ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/118711ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/118711ae

Branch: refs/heads/USERGRID-609
Commit: 118711aee83875f72a0b058b784218b211748d4e
Parents: 413f023
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 09:59:00 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 09:59:00 2015 -0600

----------------------------------------------------------------------
 .../read/graph/AbstractReadGraphFilter.java     | 52 ++++++++++++++++++--
 1 file changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/118711ae/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 503fcf9..303bc5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -62,6 +63,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
 
 
         final String edgeName = getEdgeTypeName();
+        final EdgeState edgeCursorState = new EdgeState();
 
 
         //return all ids that are emitted from this edge
@@ -80,15 +82,30 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
             return graphManager.loadEdgesFromSource( search )
-                //set our cursor every edge we traverse
+                //set the edge state for cursors
+                .doOnNext( edge -> edgeCursorState.update( edge ) )
 
-                    //map our id from the target edge
-                .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
+                    //map our id from the target edge  and set our cursor every edge we traverse
+                .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
+                    previousFilterValue.getPath() ) );
         } );
     }
 
 
     @Override
+    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+                                                   final Optional<EdgePath> parent ) {
+
+        //if it's our first pass, there's no cursor to generate
+        if(cursorValue == null){
+            return new FilterResult<>( emit, parent );
+        }
+
+        return super.createFilterResult( emit, cursorValue, parent );
+    }
+
+
+    @Override
     protected CursorSerializer<Edge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
@@ -98,4 +115,33 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
      * Get the edge type name we should use when traversing
      */
     protected abstract String getEdgeTypeName();
+
+
+    /**
+     * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
+     * always try to seek to the same position as we ended.  Since we don't deal with a persistent read result, if we
+     * seek to a value = to our last, we may skip data.
+     */
+    private final class EdgeState {
+
+        private Edge cursorEdge = null;
+        private Edge currentEdge = null;
+
+
+        /**
+         * Update the pointers
+         */
+        private void update( final Edge newEdge ) {
+            cursorEdge = currentEdge;
+            currentEdge = newEdge;
+        }
+
+
+        /**
+         * Get the edge to use in cursors for resume
+         */
+        private Edge getCursorEdge() {
+            return cursorEdge;
+        }
+    }
 }