You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/26 23:17:09 UTC

[05/11] incubator-usergrid git commit: Updates pipeline and fixes connectionref querying

Updates pipeline and fixes connectionref querying


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa9153ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa9153ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa9153ac

Branch: refs/heads/two-dot-o-dev
Commit: aa9153ac84a2a7e68dd0c9144ab867c6f75c68a0
Parents: 6d54dff
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 21 17:10:21 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 21 17:10:21 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  21 ++--
 .../pipeline/FilterPipeline.java                | 107 ------------------
 .../corepersistence/pipeline/Pipeline.java      | 110 +++++++++++++++++++
 .../pipeline/PipelineModule.java                |   9 +-
 .../pipeline/builder/CandidateBuilder.java      |  12 +-
 .../pipeline/builder/ConnectionBuilder.java     |  37 -------
 .../pipeline/builder/ConnectionRefBuilder.java  |   6 +-
 .../pipeline/builder/EntityBuilder.java         |  13 ++-
 .../pipeline/builder/IdBuilder.java             |  25 ++---
 .../pipeline/builder/PipelineBuilder.java       |   6 +-
 .../results/ConnectionRefQueryExecutor.java     |  60 ++++++++++
 .../results/EntityQueryExecutor.java            |  84 ++++++++++++++
 .../results/ObservableQueryExecutor.java        |  52 +++------
 .../corepersistence/results/QueryExecutor.java  |   1 +
 14 files changed, 320 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 1c34929..be143ce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,12 +31,12 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
 import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
 import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
 import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
+import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -640,11 +640,11 @@ public class CpRelationManager implements RelationManager {
         }
         else {
             final String entityType = collection.getType();
-            results = pipelineBuilder.searchCollection( collectionName, entityType, query.getQl().get() ).loadEntities();
+            results = pipelineBuilder.searchCollection( collectionName, query.getQl().get() ,  entityType).loadEntities();
         }
 
 
-        return new ObservableQueryExecutor( results.build() ).next();
+        return new EntityQueryExecutor( results.build() ).next();
     }
 
 
@@ -936,21 +936,24 @@ public class CpRelationManager implements RelationManager {
 
 
         if(query.getResultsLevel() == Level.REFS){
-            final Observable<ResultsPage<ConnectionRef>> results;
 
+            final IdBuilder traversedIds;
             if(query.isGraphSearch()){
 
-               results = pipelineBuilder.traverseConnection( connection, entityType   ).loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+               traversedIds = pipelineBuilder.traverseConnection( connection, entityType );
 
 
             }
             else
             {
-                results = pipelineBuilder.searchConnection( connection, query.getQl().get(),entityType) .loadIds().loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+                traversedIds = pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();
 
             }
 
-            throw new UnsupportedOperationException( "Implement me" );
+            final Observable<ResultsPage<ConnectionRef>> results = traversedIds.loadConnectionRefs(
+                cpHeadEntity.getId(), connection ).build();
+
+            return new ConnectionRefQueryExecutor( results ).next();
 
         }
 
@@ -978,7 +981,7 @@ public class CpRelationManager implements RelationManager {
 
 
 
-        return new ObservableQueryExecutor( results ).next();
+        return new EntityQueryExecutor( results ).next();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
deleted file mode 100644
index 089f47d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import rx.Observable;
-
-
-/**
- * Pipeline for applying our UG domain specific filters.
- *
- * Modeled after an observable, with typing to allow input of specific filters
- *
- * @param InputType the input type in the current pipeline state
- */
-public class FilterPipeline<InputType> {
-
-
-    private int idCount = 0;
-
-    private final ApplicationScope applicationScope;
-
-
-    private final RequestCursor requestCursor;
-    private int limit;
-
-    //Generics hell, intentionally without a generic, we check at the filter level
-    private Observable currentObservable;
-
-
-    /**
-     * Create our filter pipeline
-     */
-    public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
-
-
-        ValidationUtils.validateApplicationScope( applicationScope );
-        Preconditions.checkNotNull( cursor, "cursor optional is required" );
-        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
-
-
-        this.applicationScope = applicationScope;
-
-        //init our cursor to empty
-        this.requestCursor = new RequestCursor( cursor );
-
-        //set the default limit
-        this.limit = limit;
-
-        //set our observable to start at the application
-        final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
-        this.currentObservable = Observable.just( filter );
-    }
-
-
-    public <OutputType> FilterPipeline<OutputType> withFilter(
-        final PipelineOperation<? super InputType, ? extends OutputType> filter ) {
-
-
-
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
-
-        filter.setContext( context );
-
-        //done for clarity
-        idCount++;
-
-        return ( FilterPipeline<OutputType> ) this;
-    }
-
-
-
-    /**
-     * Return the observable of the filter pipeline
-     */
-    public Observable<InputType> execute() {
-        return currentObservable;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
new file mode 100644
index 0000000..dc95178
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import rx.Observable;
+
+
+/**
+ * Pipeline for applying our UG domain specific filters.
+ *
+ * Modeled after an observable, with typing to allow input of specific filters
+ *
+ * @param InputType the input type in the current pipeline state
+ */
+public class Pipeline<InputType> {
+
+
+    private int idCount = 0;
+
+    private final ApplicationScope applicationScope;
+
+
+    private final RequestCursor requestCursor;
+    private int limit;
+
+    //Generics hell, intentionally without a generic, we check at the filter level
+    private Observable currentObservable;
+
+
+    /**
+     * Create our filter pipeline
+     */
+    public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+
+
+        ValidationUtils.validateApplicationScope( applicationScope );
+        Preconditions.checkNotNull( cursor, "cursor optional is required" );
+        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+
+        this.applicationScope = applicationScope;
+
+        //init our cursor to empty
+        this.requestCursor = new RequestCursor( cursor );
+
+        //set the default limit
+        this.limit = limit;
+
+        //set our observable to start at the application
+        final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
+        this.currentObservable = Observable.just( filter );
+    }
+
+
+    public <OutputType> Pipeline<OutputType> withFilter(
+        final PipelineOperation<? super InputType, ? extends OutputType> filter ) {
+
+
+
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+
+        filter.setContext( context );
+
+        //update the observable
+        this.currentObservable = currentObservable.compose( filter );
+
+        //done for clarity
+        idCount++;
+
+        return ( Pipeline<OutputType> ) this;
+    }
+
+
+
+    /**
+     * Return the observable of the filter pipeline
+     */
+    public Observable<InputType> execute() {
+        return currentObservable;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index 8ec8704..93df066 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline;
 
 
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 
 import com.google.inject.AbstractModule;
@@ -33,15 +34,11 @@ public class PipelineModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        //Use Guice to create the builder since we don't really need to do anything
-        //other than DI when creating the filters
-//       bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class );
-
-
-//        install( new Factory)
 
             //Use Guice to create the builder since we don't really need to do anything
         //other than DI when creating the filters
        install( new FactoryModuleBuilder().build( FilterFactory.class ) );
+
+        install( new FactoryModuleBuilder().build( PipelineBuilderFactory.class ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
index 5cb2eab..9354127 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
 
 
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -31,13 +31,13 @@ import org.apache.usergrid.persistence.model.entity.Id;
 public class CandidateBuilder {
 
 
-    private final FilterPipeline<FilterResult<Candidate>> filterPipeline;
+    private final Pipeline<FilterResult<Candidate>> pipeline;
     private final FilterFactory filterFactory;
 
 
-    public CandidateBuilder( final FilterPipeline<FilterResult<Candidate>> filterPipeline,
+    public CandidateBuilder( final Pipeline<FilterResult<Candidate>> pipeline,
                              final FilterFactory filterFactory ) {
-        this.filterPipeline = filterPipeline;
+        this.pipeline = pipeline;
         this.filterFactory = filterFactory;
     }
 
@@ -48,7 +48,7 @@ public class CandidateBuilder {
      */
     public IdBuilder loadIds(){
 
-        final FilterPipeline<FilterResult<Id>> newFilter = filterPipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() );
+        final Pipeline<FilterResult<Id>> newFilter = pipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() );
 
         return new IdBuilder( newFilter, filterFactory );
     }
@@ -60,7 +60,7 @@ public class CandidateBuilder {
      */
     public EntityBuilder loadEntities(){
 
-        final FilterPipeline<FilterResult<Entity>> newFilter = filterPipeline.withFilter( filterFactory.candidateEntityFilter() );
+        final Pipeline<FilterResult<Entity>> newFilter = pipeline.withFilter( filterFactory.candidateEntityFilter() );
 
         return new EntityBuilder(newFilter  );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
deleted file mode 100644
index b4ea94e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.builder;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.ConnectionRef;
-
-import rx.Observable;
-
-
-public class ConnectionBuilder {
-
-
-
-    public Observable<ResultsPage<ConnectionRef>> build(){
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
index 6c0ebc8..362f2c6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.builder;
 
 
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
@@ -36,9 +36,9 @@ import rx.Observable;
 public class ConnectionRefBuilder {
 
 
-    private final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter;
+    private final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter;
 
-    public ConnectionRefBuilder( final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter ) {
+    public ConnectionRefBuilder( final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter ) {
        this.connectionRefFilter = connectionRefFilter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
index 07b4586..b120c56 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
@@ -20,9 +20,11 @@
 package org.apache.usergrid.corepersistence.pipeline.builder;
 
 
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import rx.Observable;
@@ -33,11 +35,11 @@ import rx.Observable;
  */
 public class EntityBuilder {
 
-    private final FilterPipeline<FilterResult<Entity>> filterPipeline;
+    private final Pipeline<FilterResult<Entity>> pipeline;
 
 
-    public EntityBuilder( final FilterPipeline<FilterResult<Entity>> filterPipeline ) {
-        this.filterPipeline = filterPipeline;
+    public EntityBuilder( final Pipeline<FilterResult<Entity>> pipeline ) {
+        this.pipeline = pipeline;
     }
 
 
@@ -46,6 +48,7 @@ public class EntityBuilder {
      * @return
      */
     public Observable<ResultsPage<Entity>> build(){
-        return null;
+        //we must add our resume filter so we drop our previous page first element if it's present
+        return pipeline.withFilter( new EntityResumeFilter() ).withFilter( new ResultsPageCollector<>() ).execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 12a89ba..4291ea9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -22,12 +22,11 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
 
 import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,11 +41,11 @@ public class IdBuilder {
 
 
     private final FilterFactory filterFactory;
-    private final FilterPipeline<FilterResult<Id>> filterPipeline;
+    private final Pipeline<FilterResult<Id>> pipeline;
 
 
-    public IdBuilder( final FilterPipeline<FilterResult<Id>> filterPipeline, final FilterFactory filterFactory ) {
-        this.filterPipeline = filterPipeline;
+    public IdBuilder( final Pipeline<FilterResult<Id>> pipeline, final FilterFactory filterFactory ) {
+        this.pipeline = pipeline;
         this.filterFactory = filterFactory;
     }
 
@@ -56,8 +55,8 @@ public class IdBuilder {
      * @return
      */
     public EntityBuilder loadEntities() {
-        final FilterPipeline<FilterResult<Entity>> pipeline =
-            filterPipeline.withFilter( filterFactory.entityLoadFilter() );
+        final Pipeline<FilterResult<Entity>> pipeline =
+            this.pipeline.withFilter( filterFactory.entityLoadFilter() );
 
         return new EntityBuilder( pipeline );
     }
@@ -69,8 +68,8 @@ public class IdBuilder {
      * @return
      */
     public IdBuilder traverseCollection( final String collectionName ) {
-        final FilterPipeline<FilterResult<Id>> newFilter =
-            filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) );
+        final Pipeline<FilterResult<Id>> newFilter =
+            pipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) );
 
         return new IdBuilder( newFilter, filterFactory );
     }
@@ -93,7 +92,7 @@ public class IdBuilder {
         }
 
 
-        return new IdBuilder( filterPipeline.withFilter(filter ), filterFactory );
+        return new IdBuilder( pipeline.withFilter(filter ), filterFactory );
     }
 
 
@@ -106,7 +105,7 @@ public class IdBuilder {
      */
     public CandidateBuilder searchCollection( final String collectionName, final String ql, final String entityType  ) {
 
-        final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchCollectionFilter(
+        final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter(
             ql, collectionName, entityType ) );
 
         return new CandidateBuilder( newFilter, filterFactory );
@@ -123,7 +122,7 @@ public class IdBuilder {
     public CandidateBuilder searchConnection( final String connectionName, final String ql ,  final Optional<String> entityType) {
 
 
-        final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchConnectionFilter(
+        final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchConnectionFilter(
             ql, connectionName, entityType ) );
 
         return new CandidateBuilder( newFilter, filterFactory );
@@ -139,7 +138,7 @@ public class IdBuilder {
     @Deprecated
     public ConnectionRefBuilder loadConnectionRefs(final Id sourceId, final String connectionType){
 
-        final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter = filterPipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType  ) ).withFilter(
+        final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter = pipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType  ) ).withFilter(
             new ConnectionRefResumeFilter() );
         return new ConnectionRefBuilder(connectionRefFilter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
index 488e9c1..f1a44ea 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
 
 
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -91,9 +91,9 @@ public class PipelineBuilder {
      */
     @Deprecated
     public IdBuilder fromId(final Id entityId){
-        FilterPipeline<FilterResult<Id>>  filterPipeline =  new FilterPipeline( applicationScope, this.cursor,limit ).withFilter(  filterFactory.getEntityIdFilter( entityId ) );
+        Pipeline<FilterResult<Id>> pipeline =  new Pipeline( applicationScope, this.cursor,limit ).withFilter(  filterFactory.getEntityIdFilter( entityId ) );
 
-        return new IdBuilder( filterPipeline, filterFactory );
+        return new IdBuilder( pipeline, filterFactory );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
new file mode 100644
index 0000000..798c9c7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of entities and turns them into
+ */
+@Deprecated//Required for 1.0 compatibility
+public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
+
+
+    public ConnectionRefQueryExecutor( final Observable<ResultsPage<ConnectionRef>> resultsObservable ) {
+        super( resultsObservable );
+    }
+
+
+    @Override
+    protected Results createResults( final ResultsPage resultsPage ) {
+        final List<ConnectionRef> connectionRefs = resultsPage.getEntityList();
+
+        final Results results = Results.fromConnections(connectionRefs);
+
+        return results;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
new file mode 100644
index 0000000..bc9001e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of entities and turns them into
+ */
+@Deprecated//Required for 1.0 compatibility
+public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
+
+
+    public EntityQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable ) {
+        super( resultsObservable );
+    }
+
+
+    @Override
+    protected Results createResults( final ResultsPage resultsPage ) {
+
+        final List<Entity> entityList = resultsPage.getEntityList();
+        final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
+
+
+        for ( final Entity entity : entityList ) {
+            resultsEntities.add( mapEntity( entity ) );
+        }
+
+        final Results results = Results.fromEntities( resultsEntities );
+
+        return results;
+    }
+
+
+    /**
+     *
+     * @param cpEntity
+     * @return
+     */
+    private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
+
+
+        final Id entityId = cpEntity.getId();
+
+        org.apache.usergrid.persistence.Entity entity =
+            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
+
+        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+        entity.addProperties( entityMap );
+
+        return entity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index c779bb7..ff44416 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -20,9 +20,7 @@
 package org.apache.usergrid.corepersistence.results;
 
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
@@ -42,53 +40,42 @@ import rx.Observable;
  * Our proxy to allow us to subscribe to observable results, then return them as an iterator.  A bridge for 2.0 -> 1.0
  * code.  This should not be used on any new code, and will eventually be deleted
  */
-public class ObservableQueryExecutor implements QueryExecutor {
+@Deprecated//Required for 1.0 compatibility
+public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
 
     private final Observable<Results> resultsObservable;
 
     public Iterator<Results> iterator;
 
 
-    public ObservableQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable) {
-       //map to our old results objects, return a default empty if required
-        this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
+    public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) {
+        //map to our old results objects, return a default empty if required
+        this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) )
+                                                  .defaultIfEmpty( new Results() );
     }
 
 
     /**
-     *
-     * @param cpEntity
+     * Transform the results
+     * @param resultsPage
      * @return
      */
-    private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
+    protected abstract Results createResults( final ResultsPage resultsPage );
 
 
-        final Id entityId = cpEntity.getId();
 
-        org.apache.usergrid.persistence.Entity entity =
-            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
-
-        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
-        entity.addProperties( entityMap );
-
-        return entity;
-    }
-
-    private Results createResults( final ResultsPage resultsPage ){
-
-        final List<Entity> entityList = resultsPage.getEntityList();
-        final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
-
-
-        for(final Entity entity: entityList){
-            resultsEntities.add( mapEntity( entity ) );
-        }
+    /**
+     * Legacy to transform our results page to a new results
+     * @param resultsPage
+     * @return
+     */
+    private Results createResultsInternal( final ResultsPage resultsPage ) {
 
-        final Results results = Results.fromEntities( resultsEntities );
 
+        final Results results = createResults( resultsPage );
 
         //add the cursor if our limit is the same
-        if(resultsPage.hasMoreResults()) {
+        if ( resultsPage.hasMoreResults() ) {
             final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
 
             if ( cursor.isPresent() ) {
@@ -96,11 +83,10 @@ public class ObservableQueryExecutor implements QueryExecutor {
             }
         }
         return results;
-
-
     }
 
 
+
     @Override
     public Iterator<Results> iterator() {
         return this;
@@ -130,6 +116,4 @@ public class ObservableQueryExecutor implements QueryExecutor {
 
         return next;
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
index 3afb77f..6bdf162 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.Results;
  *
  * QueryExecutor.next() should always return a non-null Results object
  */
+@Deprecated//Required for 1.0 compatibility
 public interface QueryExecutor extends Iterable<Results>, Iterator<Results> {