You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/21 03:24:59 UTC
[3/3] incubator-usergrid git commit: Refactors operations into easier
build pattern. Pipeline still need some work.
Refactors operations into easier build pattern. Pipeline still need some work.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d54dffc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d54dffc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d54dffc
Branch: refs/heads/USERGRID-641
Commit: 6d54dffc4e9178b85349ec591275c9005ad121ed
Parents: 3a1784f
Author: Todd Nine <tn...@apigee.com>
Authored: Wed May 20 15:00:38 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed May 20 19:24:50 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 16 +-
.../corepersistence/CpEntityManagerFactory.java | 12 +-
.../corepersistence/CpRelationManager.java | 93 +++++---
.../pipeline/FilterPipeline.java | 107 +++++++++
.../pipeline/PipelineModule.java | 2 -
.../pipeline/PipelineOperation.java | 2 +-
.../pipeline/builder/CandidateBuilder.java | 67 ++++++
.../pipeline/builder/ConnectionBuilder.java | 37 +++
.../pipeline/builder/ConnectionRefBuilder.java | 53 +++++
.../pipeline/builder/EntityBuilder.java | 51 ++++
.../pipeline/builder/IdBuilder.java | 147 ++++++++++++
.../pipeline/builder/PipelineBuilder.java | 100 ++++++++
.../builder/PipelineBuilderFactory.java | 35 +++
.../pipeline/read/AbstractFilter.java | 2 +-
.../pipeline/read/AbstractPathFilter.java | 2 +-
.../pipeline/read/Collector.java | 38 ---
.../pipeline/read/CollectorFactory.java | 38 ---
.../corepersistence/pipeline/read/Filter.java | 38 ---
.../pipeline/read/FilterFactory.java | 69 ++++--
.../pipeline/read/FilterPipeline.java | 132 -----------
.../pipeline/read/ReadFilterFactoryImpl.java | 136 -----------
.../pipeline/read/ResultsPage.java | 10 +-
.../read/collect/AbstractCollector.java | 46 ----
.../read/collect/ConnectionRefFilter.java | 68 ++++++
.../read/collect/ConnectionRefResumeFilter.java | 86 +++++++
.../read/collect/EntityResumeFilter.java | 3 +-
.../read/collect/ResultsPageCollector.java | 35 ++-
.../AbstractElasticSearchFilter.java | 171 --------------
.../pipeline/read/elasticsearch/Candidate.java | 55 -----
.../elasticsearch/CandidateEntityFilter.java | 234 -------------------
.../read/elasticsearch/CandidateIdFilter.java | 191 ---------------
.../ElasticSearchCollectionFilter.java | 77 ------
.../ElasticSearchConnectionFilter.java | 73 ------
.../ElasticsearchCursorSerializer.java | 42 ----
.../read/elasticsearch/Elasticsearchdiagram.jpg | Bin 316655 -> 0 bytes
.../graph/AbstractReadGraphEdgeByIdFilter.java | 82 -------
.../read/graph/AbstractReadGraphFilter.java | 147 ------------
.../read/graph/EdgeCursorSerializer.java | 42 ----
.../pipeline/read/graph/EntityIdFilter.java | 54 -----
.../pipeline/read/graph/EntityLoadFilter.java | 155 ------------
.../pipeline/read/graph/GraphDiagram.jpg | Bin 347711 -> 0 bytes
.../graph/ReadGraphCollectionByIdFilter.java | 49 ----
.../read/graph/ReadGraphCollectionFilter.java | 53 -----
.../graph/ReadGraphConnectionByIdFilter.java | 50 ----
.../graph/ReadGraphConnectionByTypeFilter.java | 100 --------
.../read/graph/ReadGraphConnectionFilter.java | 53 -----
.../search/AbstractElasticSearchFilter.java | 169 ++++++++++++++
.../pipeline/read/search/Candidate.java | 55 +++++
.../read/search/CandidateEntityFilter.java | 232 ++++++++++++++++++
.../pipeline/read/search/CandidateIdFilter.java | 190 +++++++++++++++
.../search/ElasticsearchCursorSerializer.java | 40 ++++
.../read/search/Elasticsearchdiagram.jpg | Bin 0 -> 316655 bytes
.../read/search/SearchCollectionFilter.java | 77 ++++++
.../read/search/SearchConnectionFilter.java | 72 ++++++
.../AbstractReadGraphEdgeByIdFilter.java | 82 +++++++
.../read/traverse/AbstractReadGraphFilter.java | 146 ++++++++++++
.../read/traverse/EdgeCursorSerializer.java | 42 ++++
.../pipeline/read/traverse/EntityIdFilter.java | 53 +++++
.../read/traverse/EntityLoadVerifyFilter.java | 154 ++++++++++++
.../pipeline/read/traverse/GraphDiagram.jpg | Bin 0 -> 347711 bytes
.../traverse/ReadGraphCollectionByIdFilter.java | 49 ++++
.../traverse/ReadGraphCollectionFilter.java | 53 +++++
.../traverse/ReadGraphConnectionByIdFilter.java | 50 ++++
.../ReadGraphConnectionByTypeFilter.java | 99 ++++++++
.../traverse/ReadGraphConnectionFilter.java | 53 +++++
.../results/ObservableQueryExecutor.java | 2 +-
.../pipeline/cursor/CursorTest.java | 4 +-
67 files changed, 2515 insertions(+), 2160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7a56631..be52547 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -38,8 +38,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.AggregateCounter;
@@ -180,8 +179,7 @@ public class CpEntityManager implements EntityManager {
private final AsyncEventService indexService;
- private final FilterFactory filterFactory;
- private final CollectorFactory collectorFactory;
+ private final PipelineBuilderFactory filterFactory;
private boolean skipAggregateCounters;
private MetricsFactory metricsFactory;
@@ -223,7 +221,7 @@ public class CpEntityManager implements EntityManager {
*/
public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig,
- final FilterFactory filterFactory, final CollectorFactory collectorFactory, final UUID applicationId ) {
+ final PipelineBuilderFactory pipelineBuilderFactory, final UUID applicationId ) {
this.entityManagerFig = entityManagerFig;
@@ -232,10 +230,8 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
Preconditions.checkNotNull( indexService, "indexService must not be null" );
- Preconditions.checkNotNull( filterFactory, "filterFactory must not be null" );
- Preconditions.checkNotNull( collectorFactory, "collectorFactory must not be null" );
- this.filterFactory = filterFactory;
- this.collectorFactory = collectorFactory;
+ Preconditions.checkNotNull( pipelineBuilderFactory, "filterFactory must not be null" );
+ this.filterFactory = pipelineBuilderFactory;
this.managerCache = managerCache;
@@ -750,7 +746,7 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
CpRelationManager relationManager =
- new CpRelationManager( metricsFactory, managerCache, filterFactory, collectorFactory, indexService, this, entityManagerFig, applicationId, entityRef );
+ new CpRelationManager( managerCache, filterFactory, indexService, this, entityManagerFig, applicationId, entityRef );
return relationManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5055538..baa1148 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,8 +35,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.exception.ConflictException;
import org.apache.usergrid.persistence.AbstractEntity;
@@ -126,8 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private final EntityIndex entityIndex;
private final MetricsFactory metricsFactory;
private final AsyncEventService indexService;
- private final FilterFactory filterFactory;
- private final CollectorFactory collectorFactory;
+ private final PipelineBuilderFactory pipelineBuilderFactory;
public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
final Injector injector ) {
@@ -141,8 +139,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.managerCache = injector.getInstance( ManagerCache.class );
this.metricsFactory = injector.getInstance( MetricsFactory.class );
this.indexService = injector.getInstance( AsyncEventService.class );
- this.filterFactory = injector.getInstance( FilterFactory.class );
- this.collectorFactory = injector.getInstance( CollectorFactory.class );
+ this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
getManagementEntityManager() );
@@ -203,7 +200,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private EntityManager _getEntityManager( UUID applicationId ) {
EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig,
- filterFactory, collectorFactory, applicationId );
+
+ pipelineBuilderFactory, applicationId );
return em;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6201fe8..1c34929 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,9 +31,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
@@ -52,7 +54,6 @@ import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Group;
import org.apache.usergrid.persistence.entities.User;
@@ -121,13 +122,12 @@ public class CpRelationManager implements RelationManager {
private final AsyncEventService indexService;
- private final FilterFactory filterFactory;
- private final CollectorFactory collectorFactory;
+ private final PipelineBuilderFactory pipelineBuilderFactory;
- public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache,
- final FilterFactory filterFactory, final CollectorFactory collectorFactory, final AsyncEventService indexService,
+ public CpRelationManager( final ManagerCache managerCache,
+ final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService,
final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId,
final EntityRef headEntity ) {
@@ -147,8 +147,7 @@ public class CpRelationManager implements RelationManager {
this.managerCache = managerCache;
this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
- this.filterFactory = filterFactory;
- this.collectorFactory = collectorFactory;
+ this.pipelineBuilderFactory = pipelineBuilderFactory;
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading head entity {}:{} from app {}", new Object[] {
@@ -629,29 +628,23 @@ public class CpRelationManager implements RelationManager {
query = adjustQuery( query );
- final FilterPipeline<Id> filterPipeline = new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ).withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) );
+ final IdBuilder pipelineBuilder =
+ pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
+ .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
- final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline;
+ final EntityBuilder results;
if ( query.isGraphSearch() ) {
- entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) )
- .withFilter( filterFactory.entityLoadFilter() );
+ results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
}
else {
final String entityType = collection.getType();
-
- entityFilterPipeline = filterPipeline.withFilter(
- filterFactory.elasticSearchCollectionFilter( query.getQl().get(), collectionName, entityType ) )
- .withFilter( filterFactory.candidateEntityFilter() );
+ results = pipelineBuilder.searchCollection( collectionName, entityType, query.getQl().get() ).loadEntities();
}
- final Observable<ResultsPage> resultsObservable =
- entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() )
- .withCollector( collectorFactory.getResultsPageCollector() ).execute();
-
- return new ObservableQueryExecutor( resultsObservable ).next();
+ return new ObservableQueryExecutor( results.build() ).next();
}
@@ -923,7 +916,7 @@ public class CpRelationManager implements RelationManager {
query = adjustQuery( query );
- final String entityType = query.getEntityType();
+ final Optional<String> entityType = Optional.fromNullable( query.getEntityType() ) ;
//set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
// -> 1.0 results
@@ -935,31 +928,57 @@ public class CpRelationManager implements RelationManager {
// collector
- final FilterPipeline<Id> filterPipeline =
- new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() )
- .withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) );
+ final IdBuilder
+ pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ).withLimit( query.getLimit() ).fromId(
+ cpHeadEntity.getId() );
+
+
+
+
+ if(query.getResultsLevel() == Level.REFS){
+ final Observable<ResultsPage<ConnectionRef>> results;
+
+ if(query.isGraphSearch()){
+
+ results = pipelineBuilder.traverseConnection( connection, entityType ).loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+
+
+ }
+ else
+ {
+ results = pipelineBuilder.searchConnection( connection, query.getQl().get(),entityType) .loadIds().loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+
+ }
+
+ throw new UnsupportedOperationException( "Implement me" );
+
+ }
+
+
+
+ if(query.getResultsLevel() == Level.IDS){
+
+ throw new UnsupportedOperationException( "Not yet implemented" );
+ }
+
+
+ //we want to load all entities
+ final Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> results;
- final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline;
if ( query.isGraphSearch() ) {
- entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphConnectionFilter( connection ) )
- .withFilter( filterFactory.entityLoadFilter() );
+ results = pipelineBuilder.traverseConnection( connection, entityType ).loadEntities().build();
}
else {
- entityFilterPipeline = filterPipeline.withFilter( filterFactory
- .elasticSearchConnectionFilter( query.getQl().get(), connection, Optional.fromNullable( entityType ) ) )
- .withFilter( filterFactory.candidateEntityFilter() );
+ results = pipelineBuilder.searchConnection( connection, query.getQl().get() , entityType).loadEntities().build();
}
- final Observable<ResultsPage> resultsObservable =
- entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() )
- .withCollector( collectorFactory.getResultsPageCollector() ).execute();
- return new ObservableQueryExecutor( resultsObservable ).next();
+ return new ObservableQueryExecutor( results ).next();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
new file mode 100644
index 0000000..089f47d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import rx.Observable;
+
+
+/**
+ * Pipeline for applying our UG domain specific filters.
+ *
+ * Modeled after an observable, with typing to allow input of specific filters
+ *
+ * @param InputType the input type in the current pipeline state
+ */
+public class FilterPipeline<InputType> {
+
+
+ private int idCount = 0;
+
+ private final ApplicationScope applicationScope;
+
+
+ private final RequestCursor requestCursor;
+ private int limit;
+
+ //Generics hell, intentionally without a generic, we check at the filter level
+ private Observable currentObservable;
+
+
+ /**
+ * Create our filter pipeline
+ */
+ public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+
+
+ ValidationUtils.validateApplicationScope( applicationScope );
+ Preconditions.checkNotNull( cursor, "cursor optional is required" );
+ Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+
+ this.applicationScope = applicationScope;
+
+ //init our cursor to empty
+ this.requestCursor = new RequestCursor( cursor );
+
+ //set the default limit
+ this.limit = limit;
+
+ //set our observable to start at the application
+ final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
+ this.currentObservable = Observable.just( filter );
+ }
+
+
+ public <OutputType> FilterPipeline<OutputType> withFilter(
+ final PipelineOperation<? super InputType, ? extends OutputType> filter ) {
+
+
+
+ final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+
+ filter.setContext( context );
+
+ //done for clarity
+ idCount++;
+
+ return ( FilterPipeline<OutputType> ) this;
+ }
+
+
+
+ /**
+ * Return the observable of the filter pipeline
+ */
+ public Observable<InputType> execute() {
+ return currentObservable;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index ef696bd..8ec8704 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.corepersistence.pipeline;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
import com.google.inject.AbstractModule;
@@ -44,6 +43,5 @@ public class PipelineModule extends AbstractModule {
//Use Guice to create the builder since we don't really need to do anything
//other than DI when creating the filters
install( new FactoryModuleBuilder().build( FilterFactory.class ) );
- install( new FactoryModuleBuilder().build( CollectorFactory.class ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
index d2fa16c..3dda22e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -33,7 +33,7 @@ import rx.Observable;
* @param <T> The input type of the filter value
* @param <R> The output type of the filter value
*/
-public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+public interface PipelineOperation<T, R> extends Observable.Transformer<T, R> {
void setContext(final PipelineContext pipelineContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
new file mode 100644
index 0000000..5cb2eab
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class CandidateBuilder {
+
+
+ private final FilterPipeline<FilterResult<Candidate>> filterPipeline;
+ private final FilterFactory filterFactory;
+
+
+ public CandidateBuilder( final FilterPipeline<FilterResult<Candidate>> filterPipeline,
+ final FilterFactory filterFactory ) {
+ this.filterPipeline = filterPipeline;
+ this.filterFactory = filterFactory;
+ }
+
+
+ /**
+ * Validates all candidates for the versions by id and sets them
+ * @return
+ */
+ public IdBuilder loadIds(){
+
+ final FilterPipeline<FilterResult<Id>> newFilter = filterPipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() );
+
+ return new IdBuilder( newFilter, filterFactory );
+ }
+
+
+ /**
+ * Load all the candidates as entities and return them
+ * @return
+ */
+ public EntityBuilder loadEntities(){
+
+ final FilterPipeline<FilterResult<Entity>> newFilter = filterPipeline.withFilter( filterFactory.candidateEntityFilter() );
+
+ return new EntityBuilder(newFilter );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
new file mode 100644
index 0000000..b4ea94e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.ConnectionRef;
+
+import rx.Observable;
+
+
+public class ConnectionBuilder {
+
+
+
+ public Observable<ResultsPage<ConnectionRef>> build(){
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
new file mode 100644
index 0000000..6c0ebc8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
+import org.apache.usergrid.persistence.ConnectionRef;
+
+import rx.Observable;
+
+
+/**
+ * A 1.0 compatibility state. Should be removed as services are refactored
+ */
+@Deprecated
+public class ConnectionRefBuilder {
+
+
+ private final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter;
+
+ public ConnectionRefBuilder( final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter ) {
+ this.connectionRefFilter = connectionRefFilter;
+ }
+
+
+ /**
+ * Build our connection refs observable
+ * @return
+ */
+ public Observable<ResultsPage<ConnectionRef>> build(){
+ return connectionRefFilter.withFilter( new ResultsPageCollector<>() ).execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
new file mode 100644
index 0000000..07b4586
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import rx.Observable;
+
+
+/**
+ * Builder to build our entity state
+ */
+public class EntityBuilder {
+
+ private final FilterPipeline<FilterResult<Entity>> filterPipeline;
+
+
+ public EntityBuilder( final FilterPipeline<FilterResult<Entity>> filterPipeline ) {
+ this.filterPipeline = filterPipeline;
+ }
+
+
+ /**
+ * Build our results of entities
+ * @return
+ */
+ public Observable<ResultsPage<Entity>> build(){
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
new file mode 100644
index 0000000..12a89ba
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder to transition from emitting Ids in the pipeline into other operations
+ */
+public class IdBuilder {
+
+
+ private final FilterFactory filterFactory;
+ private final FilterPipeline<FilterResult<Id>> filterPipeline;
+
+
+ public IdBuilder( final FilterPipeline<FilterResult<Id>> filterPipeline, final FilterFactory filterFactory ) {
+ this.filterPipeline = filterPipeline;
+ this.filterFactory = filterFactory;
+ }
+
+
+ /**
+ * Load all the ids we encounter when traversing the graph as entities
+ * @return
+ */
+ public EntityBuilder loadEntities() {
+ final FilterPipeline<FilterResult<Entity>> pipeline =
+ filterPipeline.withFilter( filterFactory.entityLoadFilter() );
+
+ return new EntityBuilder( pipeline );
+ }
+
+
+ /**
+ * Traverse all the collection edges from our input Id
+ * @param collectionName
+ * @return
+ */
+ public IdBuilder traverseCollection( final String collectionName ) {
+ final FilterPipeline<FilterResult<Id>> newFilter =
+ filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) );
+
+ return new IdBuilder( newFilter, filterFactory );
+ }
+
+
+ /**
+ * Traverse all connection edges from our input Id
+ * @param connectionName The name of the connection
+ * @param entityType The optional type of the entity
+ * @return
+ */
+ public IdBuilder traverseConnection( final String connectionName, final Optional<String> entityType ) {
+
+ final PipelineOperation<FilterResult<Id>, FilterResult<Id>> filter;
+
+ if(entityType.isPresent()){
+ filter = filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType.get() );
+ }else{
+ filter = filterFactory.readGraphConnectionFilter( connectionName );
+ }
+
+
+ return new IdBuilder( filterPipeline.withFilter(filter ), filterFactory );
+ }
+
+
+ /**
+ * Search all collections from our inputId with the specified criteria
+ * @param collectionName The name of the collection
+ * @param ql The user's query to execute
+ * @param entityType The type of the entity
+ * @return Candidate results
+ */
+ public CandidateBuilder searchCollection( final String collectionName, final String ql, final String entityType ) {
+
+ final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchCollectionFilter(
+ ql, collectionName, entityType ) );
+
+ return new CandidateBuilder( newFilter, filterFactory );
+ }
+
+
+ /**
+ * Search all connections from our input Id and search their connections
+ * @param connectionName The connection name to search
+ * @param ql The query to execute
+ * @param entityType The optional type of entity. If this is absent, all entity types in the connection will be searched
+ * @return Candidate results
+ */
+ public CandidateBuilder searchConnection( final String connectionName, final String ql , final Optional<String> entityType) {
+
+
+ final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchConnectionFilter(
+ ql, connectionName, entityType ) );
+
+ return new CandidateBuilder( newFilter, filterFactory );
+ }
+
+
+ /**
+ * Create connection refs from our ids. This is a legacy operation
+ * @param sourceId
+ * @param connectionType
+ * @return
+ */
+ @Deprecated
+ public ConnectionRefBuilder loadConnectionRefs(final Id sourceId, final String connectionType){
+
+ final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter = filterPipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType ) ).withFilter(
+ new ConnectionRefResumeFilter() );
+ return new ConnectionRefBuilder(connectionRefFilter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
new file mode 100644
index 0000000..488e9c1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * This is our root builder to build filter pipelines. All operations should start with an instance of this class, and compose
+ * graph operations by traversing various builders to create our filter pipeline
+ */
+public class PipelineBuilder {
+
+
+
+ private final ApplicationScope applicationScope;
+ private Optional<String> cursor = Optional.absent();
+ private int limit = 10;
+ private final FilterFactory filterFactory;
+
+
+ /**
+ * Create an instance of our I/O operations
+ * @param filterFactory
+ */
+ @Inject
+ public PipelineBuilder( final FilterFactory filterFactory, @Assisted final ApplicationScope applicationScope ) {
+ this.filterFactory = filterFactory;
+ this.applicationScope = applicationScope;
+ }
+
+
+
+
+ /**
+ * Set the cursor to use in our filter pipline
+ * @param cursor
+ * @return
+ */
+ public PipelineBuilder withCursor(final Optional<String> cursor){
+ Preconditions.checkNotNull(cursor, "cursor must not be null");
+ this.cursor = cursor;
+ return this;
+ }
+
+
+ /**
+ * Set our limit
+ * @param limit
+ * @return
+ */
+ public PipelineBuilder withLimit(final int limit){
+ this.limit = limit;
+ return this;
+ }
+
+
+ /**
+ * Set our start point in our graph traversal to the specified entity id. A 1.0 compatibility API. eventually this should be replaced with
+ * a call that will allow us to start traversing at the application node to any other node in the graph
+ *
+ * @param entityId
+ * @return
+ */
+ @Deprecated
+ public IdBuilder fromId(final Id entityId){
+ FilterPipeline<FilterResult<Id>> filterPipeline = new FilterPipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) );
+
+ return new IdBuilder( filterPipeline, filterFactory );
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
new file mode 100644
index 0000000..6cb515b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+public interface PipelineBuilderFactory {
+
+
+ /**
+ * Create our pipeline builder to allow us to build our pipeline
+ * @param applicationScope
+ * @return
+ */
+ PipelineBuilder create( final ApplicationScope applicationScope );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
index e4d5d44..64cf67f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -29,7 +29,7 @@ import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
* @param <T> the input type
* @param <R> The output Type
*/
-public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+public abstract class AbstractFilter<T, R> implements PipelineOperation<T, R> {
protected PipelineContext pipelineContext;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
index c68dc4a..6dc4561 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -33,7 +33,7 @@ import com.google.common.base.Optional;
* @param <R> The response type
* @param <C> The cursor type
*/
-public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<FilterResult<T>, FilterResult<R>> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
deleted file mode 100644
index e28ce44..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
-
-
-/**
- * A command that is used to reduce our stream of results into a stream of final batch outputs. When used
- * no further transformation or encoding should occur. Otherwise EdgePath data will be lost, and serialization cannot occur
- * across requests
- *
- * @param <T> The input type
- * @param <R> The output type
- */
-public interface Collector<T, R> extends PipelineOperation<T,R> {
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
deleted file mode 100644
index dd200b5..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
-
-
-/**
- * A factory for generating collectors
- */
-public interface CollectorFactory {
-
-
- /**
- * Get the results page collector
- * @return
- */
- ResultsPageCollector getResultsPageCollector();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
deleted file mode 100644
index ee01602..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
-
-
-/**
- * Traverses edges in the graph. Either by query or graph traversal. Take an observable of FilterResult, and emits
- * an observable of FilterResults. Filters should never emit groups or objects that represent collections. Items should
- * always be emitted 1 at a time. It is the responsibility of the collector to aggregate results.
- */
-public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {
-
- /**
- * Get the builder for the next phase
- * @return
- */
-// B getNextBuilder();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index d297c2a..ca5695c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,18 +20,20 @@
package org.apache.usergrid.corepersistence.pipeline.read;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
@@ -90,10 +92,9 @@ public interface FilterFactory {
* @param query The query to use when querying the entities in the collection
* @param collectionName The collection name to use when querying
*/
- ElasticSearchCollectionFilter elasticSearchCollectionFilter( @Assisted( "query" ) final String query,
- @Assisted( "collectionName" )
- final String collectionName,
- @Assisted( "entityType" ) final String entityType );
+ SearchCollectionFilter searchCollectionFilter( @Assisted( "query" ) final String query,
+ @Assisted( "collectionName" ) final String collectionName,
+ @Assisted( "entityType" ) final String entityType );
/**
@@ -103,17 +104,16 @@ public interface FilterFactory {
* @param connectionName The type of connection to query
* @param connectedEntityType The type of entity in the connection. Leave absent to query all entity types
*/
- ElasticSearchConnectionFilter elasticSearchConnectionFilter( @Assisted( "query" ) final String query,
- @Assisted( "connectionName" )
- final String connectionName,
- @Assisted( "connectedEntityType" )
- final Optional<String> connectedEntityType );
+ SearchConnectionFilter searchConnectionFilter( @Assisted( "query" ) final String query,
+ @Assisted( "connectionName" ) final String connectionName,
+ @Assisted( "connectedEntityType" )
+ final Optional<String> connectedEntityType );
/**
* Generate a new instance of the command with the specified parameters
*/
- EntityLoadFilter entityLoadFilter();
+ EntityLoadVerifyFilter entityLoadFilter();
/**
* Get the collector for collection candidate results to entities
@@ -127,16 +127,37 @@ public interface FilterFactory {
CandidateIdFilter candidateResultsIdVerifyFilter();
/**
- * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals
- *
* @param entityId The entity id to emit
+ *
+ * @deprecated A 1.0 api
+ *
+ * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals
*/
+ @Deprecated
EntityIdFilter getEntityIdFilter( final Id entityId );
/**
* Create a new instance of our entity filter
- * @return
*/
EntityResumeFilter entityResumeFilter();
+
+ /**
+ * @deprecated A 1.0 api Create a filter for resuming connection references
+ */
+ @Deprecated
+ ConnectionRefResumeFilter connectionRefResumeFilter();
+
+ /**
+ *
+ * Creates connection refs for 1.0 compatibilty
+ *
+ * @param sourceId The source id
+ * @param connectionType The connection type
+ *
+ * @deprecated A 1.0 api Create a filter for transforming incoming ids into connection refs
+ */
+ @Deprecated
+ ConnectionRefFilter connectionRefFilter( @Assisted( "sourceId" ) final Id sourceId,
+ @Assisted( "connectionType" ) final String connectionType );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
deleted file mode 100644
index f8bbdd8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import rx.Observable;
-
-
-/**
- * Pipeline for applying our UG domain specific filters.
- *
- * Modeled after an observable, with typing to allow input of specific filters
- *
- * @param InputType the input type in the current pipeline state
- */
-public class FilterPipeline<InputType> {
-
-
- private int idCount = 0;
-
- private final ApplicationScope applicationScope;
-
-
- private final RequestCursor requestCursor;
- private int limit;
-
- //Generics hell, intentionally without a generic, we check at the filter level
- private Observable currentObservable;
-
-
- /**
- * Create our filter pipeline
- */
- public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
-
-
- ValidationUtils.validateApplicationScope( applicationScope );
- Preconditions.checkNotNull( cursor, "cursor optional is required" );
- Preconditions.checkArgument( limit > 0, "limit must be > 0" );
-
-
- this.applicationScope = applicationScope;
-
- //init our cursor to empty
- this.requestCursor = new RequestCursor( cursor );
-
- //set the default limit
- this.limit = limit;
-
- //set our observable to start at the application
- final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
- this.currentObservable = Observable.just( filter );
- }
-
-
- public <OutputType> FilterPipeline<OutputType> withFilter(
- final Filter<? super InputType, ? extends OutputType> filter ) {
-
-
- setUp( filter );
-
- return ( FilterPipeline<OutputType> ) this;
- }
-
-
- public <OutputType> FilterPipeline<OutputType> withCollector(
- final Collector<? super InputType, ? extends OutputType> collector ) {
-
-
- setUp( collector );
-
- return ( FilterPipeline<OutputType> ) this;
- }
-
-
- private <OutputType> void setUp(
- final PipelineOperation<? super InputType, ? extends OutputType> pipelineOperation ) {
- setState( pipelineOperation );
-
- currentObservable = currentObservable.compose( pipelineOperation );
- }
-
-
- /**
- * Return the observable of the filter pipeline
- */
- public Observable<InputType> execute() {
- return currentObservable;
- }
-
-
- /**
- * Set the id of the state
- */
- private void setState( final PipelineOperation pipelineOperation ) {
-
-
- final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
-
- pipelineOperation.setContext( context );
-
- //done for clarity
- idCount++;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
deleted file mode 100644
index 0f73fb9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class ReadFilterFactoryImpl { //implements ReadFilterFactory {
-
-//
-// private final GraphManagerFactory graphManagerFactory;
-// private final EntityIndexFactory entityIndexFactory;
-// private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-//
-//
-// @Inject
-// public ReadFilterFactoryImpl( final GraphManagerFactory graphManagerFactory,
-// final EntityIndexFactory entityIndexFactory,
-// final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-//
-//
-// this.graphManagerFactory = graphManagerFactory;
-// this.entityIndexFactory = entityIndexFactory;
-// this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-// }
-//
-//
-// @Override
-// public ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ) {
-// return new ReadGraphCollectionFilter( graphManagerFactory, collectionName );
-// }
-//
-//
-// @Override
-// public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName,
-// final Id targetId ) {
-// return new ReadGraphCollectionByIdFilter( graphManagerFactory, collectionName, targetId );
-// }
-//
-//
-// @Override
-// public ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ) {
-// return new ReadGraphConnectionFilter( graphManagerFactory, connectionName );
-// }
-//
-//
-// @Override
-// public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName,
-// final String entityType ) {
-// return new ReadGraphConnectionByTypeFilter( graphManagerFactory, connectionName, entityType );
-// }
-//
-//
-// @Override
-// public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName,
-// final Id targetId ) {
-// return new ReadGraphConnectionByIdFilter( graphManagerFactory, connectionName, targetId );
-// }
-//
-//
-// @Override
-// public EntityLoadCollector entityLoadCollector() {
-// return new EntityLoadCollector( entityCollectionManagerFactory );
-// }
-//
-//
-// /**
-// * TODO refactor these impls to use RX internally, as well as remove the query object
-// */
-// @Override
-// public QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector(
-// final String collectionName, final String query ) {
-//
-// final Query queryObject = Query.fromQL( query );
-//
-// final QueryCollectionElasticSearchCollectorFilter filter =
-// new QueryCollectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory,
-// collectionName, queryObject );
-//
-// return filter;
-// }
-//
-//
-// @Override
-// public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector(
-// final String connectionName, final String query ) {
-//
-// final Query queryObject = Query.fromQL( query );
-//
-// final QueryConnectionElasticSearchCollectorFilter filter =
-// new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory,
-// connectionName, queryObject );
-//
-// return filter;
-// }
-//
-//
-// @Override
-// public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector(
-// final String connectionName, final String connectionEntityType, final String query ) {
-//
-// final Query queryObject = Query.fromQL( query );
-// queryObject.setConnectionType( connectionEntityType );
-//
-// final QueryConnectionElasticSearchCollectorFilter filter =
-// new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory,
-// connectionName, queryObject );
-//
-// return filter;
-// }
-//
-//
-// @Override
-// public EntityIdFilter getEntityIdFilter( final Id entityId ) {
-// return new EntityIdFilter( entityId );
-// }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 1810d65..6b3a086 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -29,24 +29,26 @@ import org.apache.usergrid.persistence.model.entity.Entity;
/**
* An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be
* considered a "page" of results. A hold over from 1.0. We shouldn't need this when we fully move away from the EM/RM
+ *
+ * @param T the type of results page
*/
-public class ResultsPage {
+public class ResultsPage<T> {
- private final List<Entity> entityList;
+ private final List<T> entityList;
private final int limit;
private final ResponseCursor responseCursor;
- public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+ public ResultsPage( final List<T> entityList, final ResponseCursor responseCursor, final int limit ) {
this.entityList = entityList;
this.responseCursor = responseCursor;
this.limit = limit;
}
- public List<Entity> getEntityList() {
+ public List<T> getEntityList() {
return entityList;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
deleted file mode 100644
index 1c5175d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.collect;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractCollector<T, R> implements Collector<T, R> {
-
-
- protected PipelineContext pipelineContext;
-
-
- @Override
- public void setContext( final PipelineContext pipelineContext ) {
- this.pipelineContext = pipelineContext;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java
new file mode 100644
index 0000000..392e33a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This class only exists for 1.0 compatibility, remove once services no longer need connection refs
+ */
+public class ConnectionRefFilter extends AbstractFilter<FilterResult<Id>, FilterResult<ConnectionRef>> {
+
+
+ private final Id sourceId;
+ private final String connectionType;
+
+
+ @Inject
+ public ConnectionRefFilter( @Assisted( "sourceId" ) final Id sourceId,
+ @Assisted( "connectionType" ) final String connectionType ) {
+ this.sourceId = sourceId;
+ this.connectionType = connectionType;
+ }
+
+
+ @Override
+ public Observable<FilterResult<ConnectionRef>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+ return filterResultObservable.map( targetResult -> {
+
+ final Id targetId = targetResult.getValue();
+ final ConnectionRef ref =
+ new ConnectionRefImpl( sourceId.getType(), sourceId.getUuid(), connectionType, targetId.getType(),
+ targetId.getUuid() );
+
+ return new FilterResult<>( ref, Optional.<EdgePath>absent() );
+ } );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java
new file mode 100644
index 0000000..5c3a93e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if
+ * it matches the Id that was set. This is a 1.0 compatibility implementation, and should be removed when services no
+ * longer depends on connection refs
+ */
+public class ConnectionRefResumeFilter extends AbstractPathFilter<ConnectionRef, ConnectionRef, Id> {
+
+
+ @Override
+ public Observable<FilterResult<ConnectionRef>> call(
+ final Observable<FilterResult<ConnectionRef>> filterResultObservable ) {
+
+ //filter only the first id, then map into our path for our next pass
+
+
+ return filterResultObservable.skipWhile( filterResult -> {
+
+ final Optional<Id> startFromCursor = getSeekValue();
+
+
+ if ( !startFromCursor.isPresent() ) {
+ return false;
+ }
+
+ final ConnectedEntityRef ref = filterResult.getValue().getTargetRefs();
+
+ final Id entityId = startFromCursor.get();
+
+ return entityId.getUuid().equals( ref.getUuid() ) && entityId.getType().equals( ref.getType() );
+ } ).map( filterResult -> {
+
+
+ final ConnectionRef entity = filterResult.getValue();
+
+ final String type = entity.getTargetRefs().getType();
+ final UUID uuid = entity.getTargetRefs().getUuid();
+
+ final Id entityId = new SimpleId( uuid, type );
+
+ return createFilterResult( entity, entityId, filterResult.getPath() );
+ } );
+ }
+
+
+ @Override
+ protected CursorSerializer<Id> getCursorSerializer() {
+ return IdCursorSerializer.INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
index 2917b61..f545631 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
@@ -22,7 +22,6 @@ package org.apache.usergrid.corepersistence.pipeline.read.collect;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -36,7 +35,7 @@ import rx.Observable;
* A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if
* it matches the Id that was set
*/
-public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
+public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> {
@Override