You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/26 23:17:06 UTC

[02/11] incubator-usergrid git commit: Refactors operations into easier build pattern. Pipeline still need some work.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
new file mode 100644
index 0000000..d0b6af9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency.  Note that all entities may not be emitted if our load crosses page boundaries.  It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Entity>> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                  final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+    @Override
+       public Observable<FilterResult<Entity>> call(
+           final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        //buffer them to get a page size we can make 1 network hop
+        final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+            //load them
+            .flatMap( candidateResults -> {
+                    //flatten toa list of ids to load
+                    final Observable<List<Id>> candidateIds =
+                        Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+                    //load the ids
+                    final Observable<EntitySet> entitySetObservable =
+                        candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map(
+                        entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+                                              .flatMap(
+                                                  entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        //if we filter all our results, we want to continue to try the next page
+        return searchIdSetObservable;
+    }
+
+
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+                               final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        public EntityIndexBatch getBatch() {
+            return batch;
+        }
+
+
+        private void validate( final FilterResult<Candidate> filterResult ) {
+
+            final Candidate candidate = filterResult.getValue();
+            final CandidateResult candidateResult = candidate.getCandidateResult();
+            final SearchEdge searchEdge = candidate.getSearchEdge();
+            final Id candidateId = candidateResult.getId();
+            final UUID candidateVersion = candidateResult.getVersion();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null ) {
+                logger.warn(
+                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue",
+                    candidateId, candidateVersion );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+
+            }
+
+
+            final UUID entityVersion = entity.getVersion();
+            final Id entityId = entity.getId();
+
+
+
+
+
+            //entity is newer than ES version, could be an update or the entity is marked as deleted
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, candidateVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //they're the same add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
new file mode 100644
index 0000000..1ef358a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of
+ * candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Id>> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                              final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final Observable<FilterResult<Id>> searchIdSetObservable =
+            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
+                    //flatten toa list of ids to load
+                    final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
+                        candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
+
+                    //load the ids
+                    final Observable<VersionSet> versionSetObservable =
+                        candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return versionSetObservable.map(
+                        entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
+                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                        entityCollector -> Observable.from( entityCollector.collectResults() ) );
+                } );
+
+        return searchIdSetObservable;
+    }
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private static final class EntityCollector {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+        private List<FilterResult<Id>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final VersionSet versionSet;
+
+
+        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+                                final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.versionSet = versionSet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( versionSet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Id>> collectResults() {
+            return results;
+        }
+
+
+        /**
+         * Validate each candidate results vs the data loaded from cass
+         */
+        private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+            final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+            final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+            final UUID candidateVersion = candidateResult.getVersion();
+
+            final UUID entityVersion = logEntry.getVersion();
+
+            final Id entityId = logEntry.getEntityId();
+
+            //entity is newer than ES version
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+            }
+
+            //they're the same add it
+
+            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
+
+            results.add( result );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java
new file mode 100644
index 0000000..7cf5a78
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/ElasticsearchCursorSerializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+
+
+/**
+ * ElasticSearch cursor serializer
+ */
+public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> {
+
+
+    public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer();
+
+    @Override
+    protected Class<Integer> getType() {
+        return Integer.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg
new file mode 100644
index 0000000..08970e3
Binary files /dev/null and b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Elasticsearchdiagram.jpg differ

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java
new file mode 100644
index 0000000..a6edd56
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchCollectionFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
+
+
+public class SearchCollectionFilter extends AbstractElasticSearchFilter {
+
+    private final String collectionName;
+    private final String entityType;
+
+    /**
+     * Create a new instance of our command
+     *
+     * @param entityIndexFactory The entity index factory used to search
+     * @param  metricsFactory The metrics factory for metrics
+     * @param collectionName The name of the collection
+     * @param entityType The entity type
+     */
+    @Inject
+    public SearchCollectionFilter( final EntityIndexFactory entityIndexFactory, final MetricsFactory metricsFactory,
+                                   @Assisted( "query" ) final String query,
+                                   @Assisted( "collectionName" ) final String collectionName,
+                                   @Assisted( "entityType" ) final String entityType ) {
+        super( entityIndexFactory, metricsFactory, query );
+        this.collectionName = collectionName;
+        this.entityType = entityType;
+    }
+
+
+
+    @Override
+    protected SearchTypes getSearchTypes() {
+        final SearchTypes types = SearchTypes.fromTypes( entityType );
+
+        return types;
+    }
+
+
+    @Override
+    protected SearchEdge getSearchEdge( final Id incomingId ) {
+        final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName );
+
+        return searchEdge;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java
new file mode 100644
index 0000000..82d7377
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/SearchConnectionFilter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
+
+
+public class SearchConnectionFilter extends AbstractElasticSearchFilter {
+
+
+    private final String connectionName;
+    private final Optional<String> connectedEntityType;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public SearchConnectionFilter( final EntityIndexFactory entityIndexFactory, final MetricsFactory metricsFactory,
+                                   @Assisted( "query" ) final String query,
+                                   @Assisted( "connectionName" ) final String connectionName,
+                                   @Assisted( "connectedEntityType" ) final Optional<String> connectedEntityType ) {
+        super( entityIndexFactory, metricsFactory, query );
+
+        this.connectionName = connectionName;
+        this.connectedEntityType = connectedEntityType;
+    }
+
+
+    @Override
+    protected SearchTypes getSearchTypes() {
+        final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() );
+
+        return searchTypes;
+    }
+
+
+    @Override
+    protected SearchEdge getSearchEdge( final Id id ) {
+        final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName );
+
+        return searchEdge;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java
new file mode 100644
index 0000000..5b3a42e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphEdgeByIdFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * Filter should take and Id and a graph edge, and ensure the connection between the two exists
+ */
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>> implements
+    PipelineOperation<FilterResult<Id>, FilterResult<Id>> {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final Id targetId;
+
+
+    @Inject
+    public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id
+        targetId ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.targetId = targetId;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
+        return filterValueObservable.flatMap( filterValue -> {
+            final String edgeTypeName = getEdgeName();
+            final Id id = filterValue.getValue();
+
+            //create our search
+            final SearchByEdge searchByEdge =
+                new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    Optional.absent() );
+
+            //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
+            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
+        } );
+    }
+
+
+    /**
+     * Get the name of the edge to be used in the seek
+     */
+    protected abstract String getEdgeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
new file mode 100644
index 0000000..f477092
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
+
+    private final GraphManagerFactory graphManagerFactory;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
+        this.graphManagerFactory = graphManagerFactory;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
+
+        //get the graph manager
+        final GraphManager graphManager =
+            graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
+
+        final String edgeName = getEdgeTypeName();
+        final EdgeState edgeCursorState = new EdgeState();
+
+
+        //return all ids that are emitted from this edge
+        return previousIds.flatMap( previousFilterValue -> {
+
+            //set our our constant state
+            final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = previousFilterValue.getValue();
+
+
+            final SimpleSearchByEdgeType search =
+                new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    startFromCursor );
+
+            /**
+             * TODO, pass a message with pointers to our cursor values to be generated later
+             */
+            return graphManager.loadEdgesFromSource( search )
+                //set the edge state for cursors
+                .doOnNext( edge -> edgeCursorState.update( edge ) )
+
+                    //map our id from the target edge  and set our cursor every edge we traverse
+                .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
+                    previousFilterValue.getPath() ) );
+        } );
+    }
+
+
+    @Override
+    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+                                                   final Optional<EdgePath> parent ) {
+
+        //if it's our first pass, there's no cursor to generate
+        if(cursorValue == null){
+            return new FilterResult<>( emit, parent );
+        }
+
+        return super.createFilterResult( emit, cursorValue, parent );
+    }
+
+
+    @Override
+    protected CursorSerializer<Edge> getCursorSerializer() {
+        return EdgeCursorSerializer.INSTANCE;
+    }
+
+
+    /**
+     * Get the edge type name we should use when traversing
+     */
+    protected abstract String getEdgeTypeName();
+
+
+    /**
+     * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
+     * always try to seek to the same position as we ended.  Since we don't deal with a persistent read result, if we
+     * seek to a value = to our last, we may skip data.
+     */
+    private final class EdgeState {
+
+        private Edge cursorEdge = null;
+        private Edge currentEdge = null;
+
+
+        /**
+         * Update the pointers
+         */
+        private void update( final Edge newEdge ) {
+            cursorEdge = currentEdge;
+            currentEdge = newEdge;
+        }
+
+
+        /**
+         * Get the edge to use in cursors for resume
+         */
+        private Edge getCursorEdge() {
+            return cursorEdge;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
new file mode 100644
index 0000000..8d9bf6f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+
+
+/**
+ * Edge cursor serializer
+ */
+public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
+
+
+    public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
+
+    @Override
+    protected Class<SimpleEdge> getType() {
+        return SimpleEdge.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java
new file mode 100644
index 0000000..003038a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityIdFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{
+
+    private final Id entityId;
+
+
+    @Inject
+    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+       return filterValueObservable.map( idFilterResult ->  new FilterResult( entityId, idFilterResult.getPath() ));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
new file mode 100644
index 0000000..41507e9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids. and verify they are valid
+ *
+ * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
+ */
+public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>>{
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+    @Inject
+    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
+
+        //it's more efficient to make 1 network hop to get everything, then drop our results if required
+        final Observable<FilterResult<Entity>> entityObservable =
+            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
+
+                    final Observable<EntitySet> entitySetObservable =
+                        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+                                  .flatMap( ids -> entityCollectionManager.load( ids ) );
+
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+                                              .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                            entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        return entityObservable;
+    }
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this
+     * state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final List<FilterResult<Id>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Id> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        private void validate( final FilterResult<Id> filterResult ) {
+
+            final Id candidateId = filterResult.getValue();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null || !entity.getEntity().isPresent() ) {
+                logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue", candidateId );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //it exists, add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg
new file mode 100644
index 0000000..c0308bd
Binary files /dev/null and b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/GraphDiagram.jpg differ

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java
new file mode 100644
index 0000000..71d606c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionByIdFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * Read an edge in the graph to verify it's existence by id
+ */
+public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
+
+    private final String collectionName;
+
+    @Inject
+    public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) {
+        super( graphManagerFactory, targetId );
+        this.collectionName = collectionName;
+    }
+
+
+    @Override
+    protected String getEdgeName() {
+        return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
new file mode 100644
index 0000000..dc39f5c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName;
+
+
+/**
+ * Command for reading graph edges on a collection
+ */
+public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
+
+    private final String collectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
+        super( graphManagerFactory );
+        this.collectionName = collectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getEdgeTypeFromCollectionName( collectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java
new file mode 100644
index 0000000..4f71202
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByIdFilter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * Read an edge in the graph to verify it's existence by id
+ */
+public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
+
+    private final String connectionName;
+
+    @Inject
+    public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory,
+                                          @Assisted final String connectionName, @Assisted final Id targetId ) {
+        super( graphManagerFactory, targetId );
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected String getEdgeName() {
+        return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
new file mode 100644
index 0000000..61ba4ad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final String connectionName;
+    private final String entityType;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory,
+                                            @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.connectionName = connectionName;
+        this.entityType = entityType;
+    }
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+        //get the graph manager
+        final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
+
+
+        final String edgeName = getEdgeTypeFromConnectionType( connectionName );
+
+
+        //return all ids that are emitted from this edge
+        return filterResultObservable.flatMap( idFilterResult -> {
+
+              //set our our constant state
+            final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = idFilterResult.getValue();
+
+            final SimpleSearchByIdType search =
+                new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    entityType, startFromCursor );
+
+            return graphManager.loadEdgesFromSourceByType( search ).map(
+                edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Edge> getCursorSerializer() {
+        return EdgeCursorSerializer.INSTANCE;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
new file mode 100644
index 0000000..11ec5f8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
+
+    private final String connectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
+        super( graphManagerFactory );
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getEdgeTypeFromConnectionType( connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 0260d1d..c779bb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -49,7 +49,7 @@ public class ObservableQueryExecutor implements QueryExecutor {
     public Iterator<Results> iterator;
 
 
-    public ObservableQueryExecutor( final Observable<ResultsPage> resultsObservable) {
+    public ObservableQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable) {
        //map to our old results objects, return a default empty if required
         this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
index fd65ebf..7128dcf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java
@@ -25,8 +25,8 @@ package org.apache.usergrid.corepersistence.pipeline.cursor;
 import org.junit.Test;
 
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticsearchCursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EdgeCursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;