You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/26 23:17:09 UTC
[05/11] incubator-usergrid git commit: Updates pipeline and fixes
connectionref querying
Updates pipeline and fixes connectionref querying
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa9153ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa9153ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa9153ac
Branch: refs/heads/two-dot-o-dev
Commit: aa9153ac84a2a7e68dd0c9144ab867c6f75c68a0
Parents: 6d54dff
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 21 17:10:21 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 21 17:10:21 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 21 ++--
.../pipeline/FilterPipeline.java | 107 ------------------
.../corepersistence/pipeline/Pipeline.java | 110 +++++++++++++++++++
.../pipeline/PipelineModule.java | 9 +-
.../pipeline/builder/CandidateBuilder.java | 12 +-
.../pipeline/builder/ConnectionBuilder.java | 37 -------
.../pipeline/builder/ConnectionRefBuilder.java | 6 +-
.../pipeline/builder/EntityBuilder.java | 13 ++-
.../pipeline/builder/IdBuilder.java | 25 ++---
.../pipeline/builder/PipelineBuilder.java | 6 +-
.../results/ConnectionRefQueryExecutor.java | 60 ++++++++++
.../results/EntityQueryExecutor.java | 84 ++++++++++++++
.../results/ObservableQueryExecutor.java | 52 +++------
.../corepersistence/results/QueryExecutor.java | 1 +
14 files changed, 320 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 1c34929..be143ce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,12 +31,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
+import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -640,11 +640,11 @@ public class CpRelationManager implements RelationManager {
}
else {
final String entityType = collection.getType();
- results = pipelineBuilder.searchCollection( collectionName, entityType, query.getQl().get() ).loadEntities();
+ results = pipelineBuilder.searchCollection( collectionName, query.getQl().get() , entityType).loadEntities();
}
- return new ObservableQueryExecutor( results.build() ).next();
+ return new EntityQueryExecutor( results.build() ).next();
}
@@ -936,21 +936,24 @@ public class CpRelationManager implements RelationManager {
if(query.getResultsLevel() == Level.REFS){
- final Observable<ResultsPage<ConnectionRef>> results;
+ final IdBuilder traversedIds;
if(query.isGraphSearch()){
- results = pipelineBuilder.traverseConnection( connection, entityType ).loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+ traversedIds = pipelineBuilder.traverseConnection( connection, entityType );
}
else
{
- results = pipelineBuilder.searchConnection( connection, query.getQl().get(),entityType) .loadIds().loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+ traversedIds = pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();
}
- throw new UnsupportedOperationException( "Implement me" );
+ final Observable<ResultsPage<ConnectionRef>> results = traversedIds.loadConnectionRefs(
+ cpHeadEntity.getId(), connection ).build();
+
+ return new ConnectionRefQueryExecutor( results ).next();
}
@@ -978,7 +981,7 @@ public class CpRelationManager implements RelationManager {
- return new ObservableQueryExecutor( results ).next();
+ return new EntityQueryExecutor( results ).next();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
deleted file mode 100644
index 089f47d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import rx.Observable;
-
-
-/**
- * Pipeline for applying our UG domain specific filters.
- *
- * Modeled after an observable, with typing to allow input of specific filters
- *
- * @param InputType the input type in the current pipeline state
- */
-public class FilterPipeline<InputType> {
-
-
- private int idCount = 0;
-
- private final ApplicationScope applicationScope;
-
-
- private final RequestCursor requestCursor;
- private int limit;
-
- //Generics hell, intentionally without a generic, we check at the filter level
- private Observable currentObservable;
-
-
- /**
- * Create our filter pipeline
- */
- public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
-
-
- ValidationUtils.validateApplicationScope( applicationScope );
- Preconditions.checkNotNull( cursor, "cursor optional is required" );
- Preconditions.checkArgument( limit > 0, "limit must be > 0" );
-
-
- this.applicationScope = applicationScope;
-
- //init our cursor to empty
- this.requestCursor = new RequestCursor( cursor );
-
- //set the default limit
- this.limit = limit;
-
- //set our observable to start at the application
- final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
- this.currentObservable = Observable.just( filter );
- }
-
-
- public <OutputType> FilterPipeline<OutputType> withFilter(
- final PipelineOperation<? super InputType, ? extends OutputType> filter ) {
-
-
-
- final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
-
- filter.setContext( context );
-
- //done for clarity
- idCount++;
-
- return ( FilterPipeline<OutputType> ) this;
- }
-
-
-
- /**
- * Return the observable of the filter pipeline
- */
- public Observable<InputType> execute() {
- return currentObservable;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
new file mode 100644
index 0000000..dc95178
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import rx.Observable;
+
+
+/**
+ * Pipeline for applying our UG domain specific filters.
+ *
+ * Modeled after an observable, with typing to allow input of specific filters
+ *
+ * @param InputType the input type in the current pipeline state
+ */
+public class Pipeline<InputType> {
+
+
+ private int idCount = 0;
+
+ private final ApplicationScope applicationScope;
+
+
+ private final RequestCursor requestCursor;
+ private int limit;
+
+ //Generics hell, intentionally without a generic, we check at the filter level
+ private Observable currentObservable;
+
+
+ /**
+ * Create our filter pipeline
+ */
+ public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+
+
+ ValidationUtils.validateApplicationScope( applicationScope );
+ Preconditions.checkNotNull( cursor, "cursor optional is required" );
+ Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+
+ this.applicationScope = applicationScope;
+
+ //init our cursor to empty
+ this.requestCursor = new RequestCursor( cursor );
+
+ //set the default limit
+ this.limit = limit;
+
+ //set our observable to start at the application
+ final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
+ this.currentObservable = Observable.just( filter );
+ }
+
+
+ public <OutputType> Pipeline<OutputType> withFilter(
+ final PipelineOperation<? super InputType, ? extends OutputType> filter ) {
+
+
+
+ final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+
+ filter.setContext( context );
+
+ //update the observable
+ this.currentObservable = currentObservable.compose( filter );
+
+ //done for clarity
+ idCount++;
+
+ return ( Pipeline<OutputType> ) this;
+ }
+
+
+
+ /**
+ * Return the observable of the filter pipeline
+ */
+ public Observable<InputType> execute() {
+ return currentObservable;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index 8ec8704..93df066 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
import com.google.inject.AbstractModule;
@@ -33,15 +34,11 @@ public class PipelineModule extends AbstractModule {
@Override
protected void configure() {
- //Use Guice to create the builder since we don't really need to do anything
- //other than DI when creating the filters
-// bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class );
-
-
-// install( new Factory)
//Use Guice to create the builder since we don't really need to do anything
//other than DI when creating the filters
install( new FactoryModuleBuilder().build( FilterFactory.class ) );
+
+ install( new FactoryModuleBuilder().build( PipelineBuilderFactory.class ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
index 5cb2eab..9354127 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -31,13 +31,13 @@ import org.apache.usergrid.persistence.model.entity.Id;
public class CandidateBuilder {
- private final FilterPipeline<FilterResult<Candidate>> filterPipeline;
+ private final Pipeline<FilterResult<Candidate>> pipeline;
private final FilterFactory filterFactory;
- public CandidateBuilder( final FilterPipeline<FilterResult<Candidate>> filterPipeline,
+ public CandidateBuilder( final Pipeline<FilterResult<Candidate>> pipeline,
final FilterFactory filterFactory ) {
- this.filterPipeline = filterPipeline;
+ this.pipeline = pipeline;
this.filterFactory = filterFactory;
}
@@ -48,7 +48,7 @@ public class CandidateBuilder {
*/
public IdBuilder loadIds(){
- final FilterPipeline<FilterResult<Id>> newFilter = filterPipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() );
+ final Pipeline<FilterResult<Id>> newFilter = pipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() );
return new IdBuilder( newFilter, filterFactory );
}
@@ -60,7 +60,7 @@ public class CandidateBuilder {
*/
public EntityBuilder loadEntities(){
- final FilterPipeline<FilterResult<Entity>> newFilter = filterPipeline.withFilter( filterFactory.candidateEntityFilter() );
+ final Pipeline<FilterResult<Entity>> newFilter = pipeline.withFilter( filterFactory.candidateEntityFilter() );
return new EntityBuilder(newFilter );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
deleted file mode 100644
index b4ea94e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.builder;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.ConnectionRef;
-
-import rx.Observable;
-
-
-public class ConnectionBuilder {
-
-
-
- public Observable<ResultsPage<ConnectionRef>> build(){
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
index 6c0ebc8..362f2c6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.builder;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
@@ -36,9 +36,9 @@ import rx.Observable;
public class ConnectionRefBuilder {
- private final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter;
+ private final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter;
- public ConnectionRefBuilder( final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter ) {
+ public ConnectionRefBuilder( final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter ) {
this.connectionRefFilter = connectionRefFilter;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
index 07b4586..b120c56 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
@@ -20,9 +20,11 @@
package org.apache.usergrid.corepersistence.pipeline.builder;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
import org.apache.usergrid.persistence.model.entity.Entity;
import rx.Observable;
@@ -33,11 +35,11 @@ import rx.Observable;
*/
public class EntityBuilder {
- private final FilterPipeline<FilterResult<Entity>> filterPipeline;
+ private final Pipeline<FilterResult<Entity>> pipeline;
- public EntityBuilder( final FilterPipeline<FilterResult<Entity>> filterPipeline ) {
- this.filterPipeline = filterPipeline;
+ public EntityBuilder( final Pipeline<FilterResult<Entity>> pipeline ) {
+ this.pipeline = pipeline;
}
@@ -46,6 +48,7 @@ public class EntityBuilder {
* @return
*/
public Observable<ResultsPage<Entity>> build(){
- return null;
+ //we must add our resume filter so we drop our previous page first element if it's present
+ return pipeline.withFilter( new EntityResumeFilter() ).withFilter( new ResultsPageCollector<>() ).execute();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 12a89ba..4291ea9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -22,12 +22,11 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,11 +41,11 @@ public class IdBuilder {
private final FilterFactory filterFactory;
- private final FilterPipeline<FilterResult<Id>> filterPipeline;
+ private final Pipeline<FilterResult<Id>> pipeline;
- public IdBuilder( final FilterPipeline<FilterResult<Id>> filterPipeline, final FilterFactory filterFactory ) {
- this.filterPipeline = filterPipeline;
+ public IdBuilder( final Pipeline<FilterResult<Id>> pipeline, final FilterFactory filterFactory ) {
+ this.pipeline = pipeline;
this.filterFactory = filterFactory;
}
@@ -56,8 +55,8 @@ public class IdBuilder {
* @return
*/
public EntityBuilder loadEntities() {
- final FilterPipeline<FilterResult<Entity>> pipeline =
- filterPipeline.withFilter( filterFactory.entityLoadFilter() );
+ final Pipeline<FilterResult<Entity>> pipeline =
+ this.pipeline.withFilter( filterFactory.entityLoadFilter() );
return new EntityBuilder( pipeline );
}
@@ -69,8 +68,8 @@ public class IdBuilder {
* @return
*/
public IdBuilder traverseCollection( final String collectionName ) {
- final FilterPipeline<FilterResult<Id>> newFilter =
- filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) );
+ final Pipeline<FilterResult<Id>> newFilter =
+ pipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) );
return new IdBuilder( newFilter, filterFactory );
}
@@ -93,7 +92,7 @@ public class IdBuilder {
}
- return new IdBuilder( filterPipeline.withFilter(filter ), filterFactory );
+ return new IdBuilder( pipeline.withFilter(filter ), filterFactory );
}
@@ -106,7 +105,7 @@ public class IdBuilder {
*/
public CandidateBuilder searchCollection( final String collectionName, final String ql, final String entityType ) {
- final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchCollectionFilter(
+ final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter(
ql, collectionName, entityType ) );
return new CandidateBuilder( newFilter, filterFactory );
@@ -123,7 +122,7 @@ public class IdBuilder {
public CandidateBuilder searchConnection( final String connectionName, final String ql , final Optional<String> entityType) {
- final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchConnectionFilter(
+ final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchConnectionFilter(
ql, connectionName, entityType ) );
return new CandidateBuilder( newFilter, filterFactory );
@@ -139,7 +138,7 @@ public class IdBuilder {
@Deprecated
public ConnectionRefBuilder loadConnectionRefs(final Id sourceId, final String connectionType){
- final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter = filterPipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType ) ).withFilter(
+ final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter = pipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType ) ).withFilter(
new ConnectionRefResumeFilter() );
return new ConnectionRefBuilder(connectionRefFilter);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
index 488e9c1..f1a44ea 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -91,9 +91,9 @@ public class PipelineBuilder {
*/
@Deprecated
public IdBuilder fromId(final Id entityId){
- FilterPipeline<FilterResult<Id>> filterPipeline = new FilterPipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) );
+ Pipeline<FilterResult<Id>> pipeline = new Pipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) );
- return new IdBuilder( filterPipeline, filterFactory );
+ return new IdBuilder( pipeline, filterFactory );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
new file mode 100644
index 0000000..798c9c7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of entities and turns them into
+ */
+@Deprecated//Required for 1.0 compatibility
+public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
+
+
+ public ConnectionRefQueryExecutor( final Observable<ResultsPage<ConnectionRef>> resultsObservable ) {
+ super( resultsObservable );
+ }
+
+
+ @Override
+ protected Results createResults( final ResultsPage resultsPage ) {
+ final List<ConnectionRef> connectionRefs = resultsPage.getEntityList();
+
+ final Results results = Results.fromConnections(connectionRefs);
+
+ return results;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
new file mode 100644
index 0000000..bc9001e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of entities and turns them into
+ */
+@Deprecated//Required for 1.0 compatibility
+public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
+
+
+ public EntityQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable ) {
+ super( resultsObservable );
+ }
+
+
+ @Override
+ protected Results createResults( final ResultsPage resultsPage ) {
+
+ final List<Entity> entityList = resultsPage.getEntityList();
+ final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
+
+
+ for ( final Entity entity : entityList ) {
+ resultsEntities.add( mapEntity( entity ) );
+ }
+
+ final Results results = Results.fromEntities( resultsEntities );
+
+ return results;
+ }
+
+
+ /**
+ *
+ * @param cpEntity
+ * @return
+ */
+ private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
+
+
+ final Id entityId = cpEntity.getId();
+
+ org.apache.usergrid.persistence.Entity entity =
+ EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
+
+ Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+ entity.addProperties( entityMap );
+
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index c779bb7..ff44416 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -20,9 +20,7 @@
package org.apache.usergrid.corepersistence.results;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -42,53 +40,42 @@ import rx.Observable;
* Our proxy to allow us to subscribe to observable results, then return them as an iterator. A bridge for 2.0 -> 1.0
* code. This should not be used on any new code, and will eventually be deleted
*/
-public class ObservableQueryExecutor implements QueryExecutor {
+@Deprecated//Required for 1.0 compatibility
+public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
private final Observable<Results> resultsObservable;
public Iterator<Results> iterator;
- public ObservableQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable) {
- //map to our old results objects, return a default empty if required
- this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() );
+ public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) {
+ //map to our old results objects, return a default empty if required
+ this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) )
+ .defaultIfEmpty( new Results() );
}
/**
- *
- * @param cpEntity
+ * Transform the results
+ * @param resultsPage
* @return
*/
- private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) {
+ protected abstract Results createResults( final ResultsPage resultsPage );
- final Id entityId = cpEntity.getId();
- org.apache.usergrid.persistence.Entity entity =
- EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
-
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
- entity.addProperties( entityMap );
-
- return entity;
- }
-
- private Results createResults( final ResultsPage resultsPage ){
-
- final List<Entity> entityList = resultsPage.getEntityList();
- final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() );
-
-
- for(final Entity entity: entityList){
- resultsEntities.add( mapEntity( entity ) );
- }
+ /**
+ * Legacy to transform our results page to a new results
+ * @param resultsPage
+ * @return
+ */
+ private Results createResultsInternal( final ResultsPage resultsPage ) {
- final Results results = Results.fromEntities( resultsEntities );
+ final Results results = createResults( resultsPage );
//add the cursor if our limit is the same
- if(resultsPage.hasMoreResults()) {
+ if ( resultsPage.hasMoreResults() ) {
final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString();
if ( cursor.isPresent() ) {
@@ -96,11 +83,10 @@ public class ObservableQueryExecutor implements QueryExecutor {
}
}
return results;
-
-
}
+
@Override
public Iterator<Results> iterator() {
return this;
@@ -130,6 +116,4 @@ public class ObservableQueryExecutor implements QueryExecutor {
return next;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
index 3afb77f..6bdf162 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.Results;
*
* QueryExecutor.next() should always return a non-null Results object
*/
+@Deprecated//Required for 1.0 compatibility
public interface QueryExecutor extends Iterable<Results>, Iterator<Results> {