You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/14 02:06:33 UTC
[1/2] incubator-usergrid git commit: Refactored iterator to use a
cursor and invoke service with each page to perform seeking. Otherwise the
subscription will run infinitely.
Repository: incubator-usergrid
Updated Branches:
refs/heads/queryfix [created] cc80ba926
Refactored iterator to use a cursor and invoke service with each page to perform seeking. Otherwise the subscription will run infinitely.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/35a7be37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/35a7be37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/35a7be37
Branch: refs/heads/queryfix
Commit: 35a7be3742e1f305d644d9291541639b4f4f6ebc
Parents: 48689eb
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Jul 13 17:13:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Jul 13 17:49:12 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 15 +-
.../corepersistence/CpEntityManager.java | 12 +-
.../corepersistence/CpEntityManagerFactory.java | 9 +-
.../corepersistence/CpRelationManager.java | 210 +++++++++----------
.../results/ConnectionRefQueryExecutor.java | 8 +-
.../results/EntityQueryExecutor.java | 11 +-
.../results/ObservableQueryExecutor.java | 71 ++++---
.../service/CollectionSearch.java | 87 ++++++++
.../service/CollectionService.java | 38 ++++
.../service/CollectionServiceImpl.java | 76 +++++++
.../service/ConnectionSearch.java | 87 ++++++++
.../service/ConnectionService.java | 49 +++++
.../service/ConnectionServiceImpl.java | 120 +++++++++++
.../persistence/core/astyanax/CassandraFig.java | 2 +-
.../rx/ObservableToBlockingIteratorFactory.java | 125 -----------
.../persistence/core/rx/OrderedMergeTest.java | 68 ++----
16 files changed, 663 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index dd0e1ab..d31099b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,10 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.CollectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.guice.CommonModule;
@@ -65,9 +69,6 @@ public class CoreModule extends AbstractModule {
protected void configure() {
-// //See TODO, this is fugly
-// bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
-
install( new CommonModule());
install( new CollectionModule() {
/**
@@ -156,6 +157,14 @@ public class CoreModule extends AbstractModule {
//install our pipeline modules
install(new PipelineModule());
+ /**
+ * Install our service operations
+ */
+
+ bind( CollectionService.class).to( CollectionServiceImpl.class );
+
+ bind( ConnectionService.class).to( ConnectionServiceImpl.class);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6184616..0bf23e0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -36,6 +36,8 @@ import java.util.UUID;
import com.google.common.base.Optional;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.IndexRefreshCommand;
@@ -196,6 +198,10 @@ public class CpEntityManager implements EntityManager {
private final PipelineBuilderFactory pipelineBuilderFactory;
+ private final CollectionService collectionService;
+ private final ConnectionService connectionService;
+
+
private final GraphManagerFactory graphManagerFactory;
private boolean skipAggregateCounters;
@@ -241,6 +247,8 @@ public class CpEntityManager implements EntityManager {
final EntityManagerFig entityManagerFig,
final PipelineBuilderFactory pipelineBuilderFactory ,
final GraphManagerFactory graphManagerFactory,
+ final CollectionService collectionService,
+ final ConnectionService connectionService,
final UUID applicationId ) {
this.entityManagerFig = entityManagerFig;
@@ -254,6 +262,8 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" );
this.pipelineBuilderFactory = pipelineBuilderFactory;
this.graphManagerFactory = graphManagerFactory;
+ this.connectionService = connectionService;
+ this.collectionService = collectionService;
@@ -778,7 +788,7 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
CpRelationManager relationManager =
- new CpRelationManager( managerCache, pipelineBuilderFactory, indexService, this, entityManagerFig, applicationId, entityRef );
+ new CpRelationManager( managerCache, pipelineBuilderFactory, indexService, collectionService, connectionService, this, entityManagerFig, applicationId, entityRef );
return relationManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 048c558..81202ec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.exception.ConflictException;
import org.apache.usergrid.persistence.AbstractEntity;
@@ -119,6 +121,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private final MetricsFactory metricsFactory;
private final AsyncEventService indexService;
private final PipelineBuilderFactory pipelineBuilderFactory;
+ private final CollectionService collectionService;
+ private final ConnectionService connectionService;
private final GraphManagerFactory graphManagerFactory;
public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
@@ -137,6 +141,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
getManagementEntityManager() );
+ this.collectionService = injector.getInstance( CollectionService.class );
+ this.connectionService = injector.getInstance( ConnectionService.class );
+
}
@@ -192,7 +199,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private EntityManager _getEntityManager( UUID applicationId ) {
EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache,
- metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, applicationId );
+ metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, collectionService, connectionService, applicationId );
return em;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3df09e6..2c460a7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -26,18 +26,19 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import org.apache.usergrid.persistence.index.EntityIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
-import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
+import org.apache.usergrid.corepersistence.service.CollectionSearch;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionSearch;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.ConnectedEntityRef;
@@ -62,11 +63,10 @@ import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -85,8 +85,6 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createColle
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
@@ -125,11 +123,14 @@ public class CpRelationManager implements RelationManager {
private final PipelineBuilderFactory pipelineBuilderFactory;
+ private final CollectionService collectionService;
+ private final ConnectionService connectionService;
- public CpRelationManager( final ManagerCache managerCache,
- final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService,
- final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId,
+ public CpRelationManager( final ManagerCache managerCache, final PipelineBuilderFactory pipelineBuilderFactory,
+ final AsyncEventService indexService, final CollectionService collectionService,
+ final ConnectionService connectionService, final EntityManager em,
+ final EntityManagerFig entityManagerFig, final UUID applicationId,
final EntityRef headEntity ) {
@@ -149,6 +150,8 @@ public class CpRelationManager implements RelationManager {
this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
this.pipelineBuilderFactory = pipelineBuilderFactory;
+ this.collectionService = collectionService;
+ this.connectionService = connectionService;
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading head entity {}:{} from app {}", new Object[] {
@@ -266,9 +269,9 @@ public class CpRelationManager implements RelationManager {
} );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edges = gm.loadEdgeVersions(
- CpNamingUtils.createEdgeFromConnectionType(new SimpleId(headEntity.getUuid(), headEntity.getType()), connectionType, entityId)
- );
+ Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+ .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
+ entityId ) );
return edges.toBlocking().firstOrDefault( null ) != null;
}
@@ -286,9 +289,9 @@ public class CpRelationManager implements RelationManager {
} );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edges = gm.loadEdgeVersions(
- CpNamingUtils.createEdgeFromCollectionName(new SimpleId(headEntity.getUuid(), headEntity.getType()), collectionName, entityId)
- );
+ Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+ .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
+ entityId ) );
return edges.toBlocking().firstOrDefault( null ) != null;
}
@@ -346,15 +349,16 @@ public class CpRelationManager implements RelationManager {
@Override
public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
- Preconditions.checkNotNull(itemRef,"itemref is null");
+ Preconditions.checkNotNull( itemRef, "itemref is null" );
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
- if ( ( collection != null && collection.getType()!=null ) && !collection.getType().equals( itemRef.getType() ) ) {
+ if ( ( collection != null && collection.getType() != null ) && !collection.getType()
+ .equals( itemRef.getType() ) ) {
return null;
}
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
- org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load(entityId);
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );
// don't fetch entity if we've already got one
@@ -371,7 +375,6 @@ public class CpRelationManager implements RelationManager {
}
-
if ( memberEntity == null ) {
throw new RuntimeException(
"Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
@@ -388,33 +391,28 @@ public class CpRelationManager implements RelationManager {
final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() );
final String linkedCollection = collection.getLinkedCollection();
- GraphManager gm = managerCache.getGraphManager(applicationScope);
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+
+ gm.writeEdge( edge ).doOnNext( writtenEdge -> {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Wrote edge {}", writtenEdge );
+ }
+ } ).filter( writtenEdge -> linkedCollection != null ).flatMap( writtenEdge -> {
+ final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
+ final Edge reverseEdge = createCollectionEdge( memberEntity.getId(), pluralType, cpHeadEntity.getId() );
- gm.writeEdge( edge )
- .doOnNext( writtenEdge -> {
- if (logger.isDebugEnabled()) {
- logger.debug("Wrote edge {}", writtenEdge);
- }
- })
- .filter(writtenEdge -> linkedCollection != null )
- .flatMap(writtenEdge -> {
- final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
- final Edge reverseEdge = createCollectionEdge( memberEntity.getId(), pluralType, cpHeadEntity.getId() );
-
- //reverse
- return gm.writeEdge(reverseEdge).doOnNext(reverseEdgeWritten -> {
- indexService.queueNewEdge(applicationScope, cpHeadEntity, reverseEdge);
- });
- })
- .doOnCompleted(() -> {
- indexService.queueNewEdge(applicationScope, memberEntity, edge);
- if (logger.isDebugEnabled()) {
- logger.debug("Added entity {}:{} to collection {}", new Object[]{
- itemRef.getUuid().toString(), itemRef.getType(), collectionName
- });
- }
- })
- .toBlocking().lastOrDefault( null );
+ //reverse
+ return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> {
+ indexService.queueNewEdge( applicationScope, cpHeadEntity, reverseEdge );
+ } );
+ } ).doOnCompleted( () -> {
+ indexService.queueNewEdge( applicationScope, memberEntity, edge );
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Added entity {}:{} to collection {}", new Object[] {
+ itemRef.getUuid().toString(), itemRef.getType(), collectionName
+ } );
+ }
+ } ).toBlocking().lastOrDefault( null );
//check if we need to reverse our edges
@@ -430,7 +428,6 @@ public class CpRelationManager implements RelationManager {
}
-
@Override
public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
throws Exception {
@@ -521,11 +518,9 @@ public class CpRelationManager implements RelationManager {
//run our delete
gm.loadEdgeVersions(
- CpNamingUtils.createEdgeFromCollectionName(cpHeadEntity.getId(), collectionName, memberEntity.getId())
- )
- .flatMap(edge -> gm.markEdge(edge))
- .flatMap(edge -> gm.deleteEdge(edge))
- .toBlocking().lastOrDefault(null);
+ CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
+ .flatMap( edge -> gm.markEdge( edge ) ).flatMap( edge -> gm.deleteEdge( edge ) ).toBlocking()
+ .lastOrDefault( null );
/**
@@ -620,26 +615,22 @@ public class CpRelationManager implements RelationManager {
query.setEntityType( collection.getType() );
- query = adjustQuery( query );
-
-
- final IdBuilder pipelineBuilder =
- pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
- .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
-
+ final Query toExecute = adjustQuery( query );
+ final Id ownerId = headEntity.asId();
- final EntityBuilder results;
+ //wire the callback so we can get each page
+ return new EntityQueryExecutor( toExecute.getCursor() ) {
+ @Override
+ protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
+ final Optional<String> cursor ) {
- if ( query.isGraphSearch() ) {
- results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
- }
- else {
- final String entityType = collection.getType();
- results = pipelineBuilder.searchCollection( collectionName, query.getQl().get() , entityType).loadEntities();
- }
+ final CollectionSearch search =
+ new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
+ toExecute.getQl(), cursor );
-
- return new EntityQueryExecutor( results.build() ).next();
+ return collectionService.searchCollection( search );
+ }
+ }.next();
}
@@ -658,10 +649,13 @@ public class CpRelationManager implements RelationManager {
if ( found ) {
break;
}
- Thread.sleep(sleepTime);
- }while (!found && length <= maxLength);
- if(logger.isInfoEnabled()){
- logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
+ Thread.sleep( sleepTime );
+ }
+ while ( !found && length <= maxLength );
+ if ( logger.isInfoEnabled() ) {
+ logger.info( String
+ .format( "Consistent Search finished in %s, results=%s, expected=%s...dumping stack", length,
+ results.size(), expectedResults ) );
}
return results;
}
@@ -692,7 +686,8 @@ public class CpRelationManager implements RelationManager {
}
final Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
- final org.apache.usergrid.persistence.model.entity.Entity targetEntity = ( ( CpEntityManager ) em ).load( entityId );
+ final org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+ ( ( CpEntityManager ) em ).load( entityId );
// create graph edge connection from head entity to member entity
final Edge edge = createConnectionEdge( cpHeadEntity.getId(), connectionType, targetEntity.getId() );
@@ -778,9 +773,9 @@ public class CpRelationManager implements RelationManager {
final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
//delete all the edges and queue their processing
- gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).doOnNext(
- returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
- .lastOrDefault( null );
+ gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) )
+ .doOnNext( returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
+ .lastOrDefault( null );
}
@@ -880,9 +875,9 @@ public class CpRelationManager implements RelationManager {
headEntity = em.validate( headEntity );
- query = adjustQuery( query );
+ final Query toExecute = adjustQuery( query );
- final Optional<String> entityType = Optional.fromNullable(query.getEntityType()) ;
+ final Optional<String> entityType = Optional.fromNullable( query.getEntityType() );
//set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
// -> 1.0 results
@@ -893,53 +888,40 @@ public class CpRelationManager implements RelationManager {
//startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
// collector
-
- final IdBuilder
- pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ).withLimit( query.getLimit() ).fromId(
- cpHeadEntity.getId() );
+ final Id sourceId = headEntity.asId();
if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {
- final IdBuilder traversedIds;
-
- if ( query.isGraphSearch() ) {
- traversedIds = pipelineBuilder.traverseConnection( connection, entityType );
- }
- else {
- traversedIds =
- pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();
- }
-
- //create connection refs
-
- final Observable<ResultsPage<ConnectionRef>> results =
- traversedIds.loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
-
- return new ConnectionRefQueryExecutor( results ).next();
- }
-
-
-
-
- //we want to load all entities
-
- final Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> results;
+ return new ConnectionRefQueryExecutor( toExecute.getCursor() ) {
+ @Override
+ protected Observable<ResultsPage<ConnectionRef>> buildNewResultsPage( final Optional<String> cursor ) {
- if ( query.isGraphSearch() ) {
- results = pipelineBuilder.traverseConnection( connection, entityType ).loadEntities().build();
- }
- else {
+ //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
- results = pipelineBuilder.searchConnection( connection, query.getQl().get() , entityType).loadEntities().build();
+ final ConnectionSearch search =
+ new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
+ toExecute.getQl(), cursor );
+ return connectionService.searchConnectionAsRefs( search );
+ }
+ }.next();
}
+ return new EntityQueryExecutor( toExecute.getCursor() ) {
+ @Override
+ protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
+ final Optional<String> cursor ) {
-
- return new EntityQueryExecutor( results ).next();
+ //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
+ final ConnectionSearch search =
+ new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
+ toExecute.getQl(), cursor );
+ return connectionService.searchConnection( search );
+ }
+ }.next();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
index 3dfd98a..cd66dad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
@@ -32,6 +32,8 @@ import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.google.common.base.Optional;
+
import rx.Observable;
@@ -39,11 +41,11 @@ import rx.Observable;
* Processes our results of connection refs
*/
@Deprecated//Required for 1.0 compatibility
-public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
+public abstract class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
- public ConnectionRefQueryExecutor( final Observable<ResultsPage<ConnectionRef>> resultsObservable ) {
- super( resultsObservable );
+ protected ConnectionRefQueryExecutor( final Optional<String> startCursor ) {
+ super( startCursor );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
index 0e18e31..5e80d24 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.google.common.base.Optional;
+
import rx.Observable;
@@ -38,11 +40,11 @@ import rx.Observable;
* Processes our results of entities
*/
@Deprecated//Required for 1.0 compatibility
-public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
+public abstract class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
- public EntityQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable ) {
- super( resultsObservable );
+ protected EntityQueryExecutor( final Optional<String> startCursor ) {
+ super( startCursor );
}
@@ -81,4 +83,7 @@ public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
return entity;
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 548e584..7b31d19 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -21,21 +21,14 @@ package org.apache.usergrid.corepersistence.results;
import java.util.Iterator;
-import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.rx.ObservableToBlockingIteratorFactory;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import rx.Observable;
-import rx.schedulers.Schedulers;
/**
@@ -45,19 +38,19 @@ import rx.schedulers.Schedulers;
@Deprecated//Required for 1.0 compatibility
public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
- private final Observable<Results> resultsObservable;
- public Iterator<Results> iterator;
+ private Results results;
+ private Optional<String> cursor;
+ private boolean complete;
- public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) {
- //map to our old results objects, return a default empty if required
- this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) )
- .defaultIfEmpty(new Results())
- .subscribeOn(Schedulers.io());
+ protected ObservableQueryExecutor(final Optional<String> startCursor){
+ this.cursor = startCursor;
}
+
+
/**
* Transform the results
* @param resultsPage
@@ -66,6 +59,14 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
protected abstract Results createResults( final ResultsPage<T> resultsPage );
+ /**
+ * Build new results page from the cursor
+ * @param cursor
+ * @return
+ */
+ protected abstract Observable<ResultsPage<T>> buildNewResultsPage(final Optional<String> cursor);
+
+
/**
* Legacy to transform our results page to a new results
@@ -90,6 +91,8 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
+
+
@Override
public Iterator<Results> iterator() {
return this;
@@ -99,13 +102,11 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
@Override
public boolean hasNext() {
- if ( iterator == null ) {
- iterator = ObservableToBlockingIteratorFactory.toIterator( resultsObservable );
+ if ( !complete && results == null) {
+ advance();
}
- boolean hasNext = iterator.hasNext();
-
- return hasNext;
+ return results != null;
}
@@ -114,16 +115,38 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
if ( !hasNext() ) {
throw new NoSuchElementException( "No more results present" );
}
- final Results next = iterator.next();
+
+ final Results next = results;
+
+ results = null;
next.setQueryExecutor( this );
return next;
}
- @Override
- protected void finalize() throws Throwable {
- resultsObservable.unsubscribeOn(Schedulers.io());
- super.finalize();
+ private void advance(){
+ //map to our old results objects, return a default empty if required
+ final Observable<Results>
+ observable = buildNewResultsPage( cursor ).map( resultsPage -> createResultsInternal( resultsPage ) ).defaultIfEmpty(
+ new Results() );
+
+ //take the first from our observable
+ final Results resultsPage = observable.take(1).toBlocking().first();
+
+ //set the results for the iterator
+ this.results = resultsPage;
+
+ //set the complete flag
+ this.complete = !resultsPage.hasCursor();
+
+ //if not comlete, set our cursor for the next iteration
+ if(!complete){
+ this.cursor = Optional.of( results.getCursor());
+ }else{
+ this.cursor = Optional.absent();
+ }
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
new file mode 100644
index 0000000..ab8a8bc
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Bean for input on searching a collection
+ */
+public class CollectionSearch {
+
+ private final ApplicationScope applicationScope;
+ private final Id collectionOwnerId;
+ private final String collectionName;
+ private final String entityType;
+ private final int limit;
+ private final Optional<String> query;
+ private final Optional<String> cursor;
+
+
+ public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String
+ collectionName,
+ final String entityType, final int limit, final Optional<String> query, final Optional<String> cursor ) {
+ this.applicationScope = applicationScope;
+ this.collectionOwnerId = collectionOwnerId;
+ this.collectionName = collectionName;
+ this.entityType = entityType;
+ this.limit = limit;
+ this.query = query;
+ this.cursor = cursor;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ public Optional<String> getQuery() {
+ return query;
+ }
+
+
+ public int getLimit() {
+ return limit;
+ }
+
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+
+ public Id getCollectionOwnerId() {
+ return collectionOwnerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
new file mode 100644
index 0000000..eef741a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import rx.Observable;
+
+
+/**
+ * Interface for operating on collections
+ */
+public interface CollectionService {
+
+ /**
+ * Search a collection and return an observable of results pages
+ * @param search The search to perform
+ * @return An observable with results page entries for the stream
+ */
+ Observable<ResultsPage<Entity>> searchCollection(final CollectionSearch search);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
new file mode 100644
index 0000000..fa79d09
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Implementation of the collection service
+ */
+@Singleton
+public class CollectionServiceImpl implements CollectionService {
+
+
+ private final PipelineBuilderFactory pipelineBuilderFactory;
+
+
+ @Inject
+ public CollectionServiceImpl( final PipelineBuilderFactory pipelineBuilderFactory ) {
+ this.pipelineBuilderFactory = pipelineBuilderFactory;
+ }
+
+
+ @Override
+ public Observable<ResultsPage<Entity>> searchCollection( final CollectionSearch search ) {
+
+
+ final ApplicationScope applicationScope = search.getApplicationScope();
+ final String collectionName = search.getCollectionName();
+ final Optional<String> query = search.getQuery();
+
+ final IdBuilder pipelineBuilder =
+ pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
+ .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+
+
+ final EntityBuilder results;
+
+ if ( !query.isPresent()) {
+ results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
+ }
+ else {
+ results = pipelineBuilder.searchCollection( collectionName, query.get(),search.getEntityType()).loadEntities();
+ }
+
+
+ return results.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
new file mode 100644
index 0000000..51f6768
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Bean for input on searching a connection
+ */
+public class ConnectionSearch {
+
+ private final ApplicationScope applicationScope;
+ private final Id sourceNodeId;
+ private final Optional<String> entityType;
+ private final String connectionName;
+ private final int limit;
+ private final Optional<String> query;
+ private final Optional<String> cursor;
+
+
+ public ConnectionSearch( final ApplicationScope applicationScope, final Id sourceNodeId, final Optional<String> entityType,
+ final String connectionName, final int limit, final Optional<String> query, final
+ Optional<String> cursor ) {
+ this.applicationScope = applicationScope;
+ this.sourceNodeId = sourceNodeId;
+ this.entityType = entityType;
+ this.connectionName = connectionName;
+ this.limit = limit;
+ this.query = query;
+ this.cursor = cursor;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public String getConnectionName() {
+ return connectionName;
+ }
+
+
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ public int getLimit() {
+ return limit;
+ }
+
+
+ public Optional<String> getQuery() {
+ return query;
+ }
+
+
+ public Id getSourceNodeId() {
+ return sourceNodeId;
+ }
+
+
+ public Optional<String> getEntityType() {
+ return entityType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
new file mode 100644
index 0000000..71a25c9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Interface for operating on connections
+ */
+public interface ConnectionService {
+
+
+ /**
+ * Search a collection and return an observable of results pages
+ * @param search The search to perform
+ * @return An observable with results page entries for the stream
+ */
+ Observable<ResultsPage<Entity>> searchConnection(final ConnectionSearch search);
+
+
+ /**
+ * Search the connections and return ids instead of entities in results pages
+ * @param search
+ * @return
+ */
+ Observable<ResultsPage<ConnectionRef>> searchConnectionAsRefs( final ConnectionSearch search );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
new file mode 100644
index 0000000..c7e0fee
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+@Singleton
+public class ConnectionServiceImpl implements ConnectionService {
+
+ private final PipelineBuilderFactory pipelineBuilderFactory;
+
+
+ @Inject
+ public ConnectionServiceImpl( final PipelineBuilderFactory pipelineBuilderFactory ) {
+ this.pipelineBuilderFactory = pipelineBuilderFactory;
+ }
+
+
+ @Override
+ public Observable<ResultsPage<Entity>> searchConnection( final ConnectionSearch search ) {
+ //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
+ // -> 1.0 results
+
+ // startid -- graph edge load -- entity load (verify) from ids -> results page collector
+ // startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector
+
+ //startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector
+ //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
+ // collector
+
+
+ final Optional<String> query = search.getQuery();
+
+ final IdBuilder pipelineBuilder =
+ pipelineBuilderFactory.create( search.getApplicationScope() ).withCursor( search.getCursor() )
+ .withLimit( search.getLimit() ).fromId( search.getSourceNodeId() );
+
+
+ //we want to load all entities
+
+ final EntityBuilder results;
+
+
+ if ( !query.isPresent() ) {
+ results =
+ pipelineBuilder.traverseConnection( search.getConnectionName(), search.getEntityType() ).loadEntities();
+ }
+
+ else {
+
+ results =
+ pipelineBuilder.searchConnection( search.getConnectionName(), query.get(), search.getEntityType() )
+ .loadEntities();
+ }
+
+
+ return results.build();
+ }
+
+
+ @Override
+ public Observable<ResultsPage<ConnectionRef>> searchConnectionAsRefs( final ConnectionSearch search ) {
+
+ final Optional<String> query = search.getQuery();
+
+ final Id sourceNodeId = search.getSourceNodeId();
+
+ final IdBuilder pipelineBuilder =
+ pipelineBuilderFactory.create( search.getApplicationScope() ).withCursor( search.getCursor() )
+ .withLimit( search.getLimit() ).fromId( sourceNodeId );
+
+
+ final IdBuilder traversedIds;
+ final String connectionName = search.getConnectionName();
+
+ if ( !query.isPresent() ) {
+ traversedIds = pipelineBuilder.traverseConnection( connectionName, search.getEntityType() );
+ }
+ else {
+ traversedIds =
+ pipelineBuilder.searchConnection( connectionName, query.get(), search.getEntityType() ).loadIds();
+ }
+
+ //create connection refs
+
+ final Observable<ResultsPage<ConnectionRef>> results =
+ traversedIds.loadConnectionRefs( sourceNodeId, connectionName ).build();
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 7cdc996..0426e37 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -48,7 +48,7 @@ public interface CassandraFig extends GuicyFig {
String getHosts();
@Key( "cassandra.version" )
- @Default( "1.2" )
+ @Default( "2.1" )
String getVersion();
@Key( "cassandra.cluster_name" )
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
deleted file mode 100644
index 9807749..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.rx;
-
-
-
-import rx.Notification;
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.exceptions.Exceptions;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Returns an Iterator that iterates over all items emitted by a specified Observable.
- * This blocks with an array blocking queue of 1
- * <p>
- * <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterator.png" alt="">
- * <p>
- *
- * @see <a href="https://github.com/ReactiveX/RxJava/issues/50">Issue #50</a>
- */
-public final class ObservableToBlockingIteratorFactory {
- private ObservableToBlockingIteratorFactory() {
- throw new IllegalStateException("No instances!");
- }
-
- /**
- * Returns an iterator that iterates all values of the observable.
- *
- * @param <T>
- * the type of source.
- * @return the iterator that could be used to iterate over the elements of the observable.
- */
- public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
- final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
-
- // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
- final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
- @Override
- public void onCompleted() {
- // ignore
- }
-
- @Override
- public void onError(Throwable e) {
- try{
- notifications.put(Notification.<T>createOnError(e));
- }catch (Exception t){
-
- }
- }
-
- @Override
- public void onNext(Notification<? extends T> args) {
- try{
- notifications.put(args);
- }catch (Exception t){
-
- }
- }
- });
-
- return new Iterator<T>() {
- private Notification<? extends T> buf;
-
- @Override
- public boolean hasNext() {
- if (buf == null) {
- buf = take();
- }
- if (buf.isOnError()) {
- throw Exceptions.propagate(buf.getThrowable());
- }
- return !buf.isOnCompleted();
- }
-
- @Override
- public T next() {
- if (hasNext()) {
- T result = buf.getValue();
- buf = null;
- return result;
- }
- throw new NoSuchElementException();
- }
-
- private Notification<? extends T> take() {
- try {
- return notifications.take();
- } catch (InterruptedException e) {
- subscription.unsubscribe();
- throw Exceptions.propagate(e);
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Read-only iterator");
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/35a7be37/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 649ac7a..6d8a466 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -19,23 +19,19 @@
package org.apache.usergrid.persistence.core.rx;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Notification;
import rx.Observable;
-import rx.Observable.Transformer;
import rx.Subscriber;
-import rx.Subscription;
-import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
@@ -156,20 +152,20 @@ public class OrderedMergeTest {
List<Integer> expected1List = Arrays.asList( 5, 3, 2, 0 );
- Observable<Integer> expected1 = Observable.from(expected1List);
+ Observable<Integer> expected1 = Observable.from( expected1List );
- List<Integer> expected2List = Arrays.asList(10, 7, 6, 4);
+ List<Integer> expected2List = Arrays.asList( 10, 7, 6, 4 );
- Observable<Integer> expected2 = Observable.from(expected2List);
+ Observable<Integer> expected2 = Observable.from( expected2List );
- List<Integer> expected3List = Arrays.asList(9, 8, 1);
+ List<Integer> expected3List = Arrays.asList( 9, 8, 1 );
- Observable<Integer> expected3 = Observable.from(expected3List);
+ Observable<Integer> expected3 = Observable.from( expected3List );
//set our buffer size to 2. We should easily exceed this since every observable has more than 2 elements
Observable<Integer> ordered =
- OrderedMerge.orderedMerge(new ReverseIntegerComparator(), 2, expected1, expected2, expected3);
+ OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
final CountDownLatch latch = new CountDownLatch( 1 );
final List<Integer> results = new ArrayList();
@@ -208,9 +204,9 @@ public class OrderedMergeTest {
/**
* Since we're on the same thread, we should blow up before we begin producing elements our size
*/
- assertEquals(0, results.size());
+ assertEquals( 0, results.size() );
- assertTrue("An exception was thrown", errorThrown[0]);
+ assertTrue( "An exception was thrown", errorThrown[0] );
}
@@ -247,14 +243,14 @@ public class OrderedMergeTest {
@Override
public void onError(final Throwable e) {
e.printStackTrace();
- fail("An error was thrown ");
+ fail( "An error was thrown " );
}
@Override
public void onNext(final Integer integer) {
log.info("onNext invoked with {}", integer);
- results.add(integer);
+ results.add( integer );
}
});
@@ -262,7 +258,7 @@ public class OrderedMergeTest {
List<Integer> expected = Arrays.asList( 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );
- assertEquals(expected.size(), results.size());
+ assertEquals( expected.size(), results.size() );
for ( int i = 0; i < expected.size(); i++ ) {
@@ -310,7 +306,7 @@ public class OrderedMergeTest {
@Override
public void onError( final Throwable e ) {
- log.error("Expected error thrown", e);
+ log.error( "Expected error thrown", e );
if ( e.getMessage().contains( "The maximum queue size of 2 has been reached" ) ) {
errorThrown[0] = true;
@@ -322,14 +318,14 @@ public class OrderedMergeTest {
@Override
public void onNext( final Integer integer ) {
- log.info("onNext invoked with {}", integer);
+ log.info( "onNext invoked with {}", integer );
}
} );
latch.await();
- assertTrue("An exception was thrown", errorThrown[0]);
+ assertTrue( "An exception was thrown", errorThrown[0] );
}
@@ -539,34 +535,6 @@ public class OrderedMergeTest {
}
}
- @Test
- public void obsIterator() {
- Iterator<Object> iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
- int count = 0;
- while (!subscriber.isUnsubscribed()) {
- //pull from source
- for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
- //emit
- log.info("loop " + count);
- subscriber.onNext(count++);
- }
- }
-
- subscriber.onCompleted();
- })
- .onBackpressureBlock(1)
- .doOnNext(o -> {
- log.info("iteration " + o);
- }).subscribeOn(Schedulers.io()));
- //never
- Object it =iterator.next();
- it = iterator.next();
- log.info("iterate");
- it = iterator.next();
- log.info("iterate");
-
- Object size = it;
- }
[2/2] incubator-usergrid git commit: Adds validation and fixes
initialization ordering bugs.
Posted by to...@apache.org.
Adds validation and fixes initialization ordering bugs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cc80ba92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cc80ba92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cc80ba92
Branch: refs/heads/queryfix
Commit: cc80ba9261364b5b7da5c7fffd6bdedc75625ad3
Parents: 35a7be3
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Jul 13 18:06:28 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Jul 13 18:06:28 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CpEntityManager.java | 11 +++++------
.../corepersistence/CpEntityManagerFactory.java | 10 +++++-----
.../corepersistence/CpRelationManager.java | 17 ++++++++++-------
3 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cc80ba92/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 0bf23e0..b530715 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -196,8 +196,6 @@ public class CpEntityManager implements EntityManager {
private final AsyncEventService indexService;
- private final PipelineBuilderFactory pipelineBuilderFactory;
-
private final CollectionService collectionService;
private final ConnectionService connectionService;
@@ -245,7 +243,6 @@ public class CpEntityManager implements EntityManager {
public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
final MetricsFactory metricsFactory,
final EntityManagerFig entityManagerFig,
- final PipelineBuilderFactory pipelineBuilderFactory ,
final GraphManagerFactory graphManagerFactory,
final CollectionService collectionService,
final ConnectionService connectionService,
@@ -258,9 +255,11 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
Preconditions.checkNotNull( indexService, "indexService must not be null" );
- Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" );
+
Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" );
- this.pipelineBuilderFactory = pipelineBuilderFactory;
+ Preconditions.checkNotNull( connectionService, "connectionService must not be null" );
+ Preconditions.checkNotNull( collectionService, "collectionService must not be null" );
+
this.graphManagerFactory = graphManagerFactory;
this.connectionService = connectionService;
this.collectionService = collectionService;
@@ -788,7 +787,7 @@ public class CpEntityManager implements EntityManager {
Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
CpRelationManager relationManager =
- new CpRelationManager( managerCache, pipelineBuilderFactory, indexService, collectionService, connectionService, this, entityManagerFig, applicationId, entityRef );
+ new CpRelationManager( managerCache, indexService, collectionService, connectionService, this, entityManagerFig, applicationId, entityRef );
return relationManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cc80ba92/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 81202ec..a679407 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -120,7 +120,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private final ReIndexService reIndexService;
private final MetricsFactory metricsFactory;
private final AsyncEventService indexService;
- private final PipelineBuilderFactory pipelineBuilderFactory;
private final CollectionService collectionService;
private final ConnectionService connectionService;
private final GraphManagerFactory graphManagerFactory;
@@ -136,13 +135,14 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.managerCache = injector.getInstance( ManagerCache.class );
this.metricsFactory = injector.getInstance( MetricsFactory.class );
this.indexService = injector.getInstance( AsyncEventService.class );
- this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class );
+ this.collectionService = injector.getInstance( CollectionService.class );
+ this.connectionService = injector.getInstance( ConnectionService.class );
+
+ //this line always needs to be last due to the temporary cicular dependency until spring is removed
this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
getManagementEntityManager() );
- this.collectionService = injector.getInstance( CollectionService.class );
- this.connectionService = injector.getInstance( ConnectionService.class );
}
@@ -199,7 +199,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private EntityManager _getEntityManager( UUID applicationId ) {
EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache,
- metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, collectionService, connectionService, applicationId );
+ metricsFactory, entityManagerFig, graphManagerFactory, collectionService, connectionService, applicationId );
return em;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cc80ba92/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2c460a7..d92b97f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -121,13 +121,11 @@ public class CpRelationManager implements RelationManager {
private final AsyncEventService indexService;
- private final PipelineBuilderFactory pipelineBuilderFactory;
-
private final CollectionService collectionService;
private final ConnectionService connectionService;
- public CpRelationManager( final ManagerCache managerCache, final PipelineBuilderFactory pipelineBuilderFactory,
+ public CpRelationManager( final ManagerCache managerCache,
final AsyncEventService indexService, final CollectionService collectionService,
final ConnectionService connectionService, final EntityManager em,
final EntityManagerFig entityManagerFig, final UUID applicationId,
@@ -139,6 +137,9 @@ public class CpRelationManager implements RelationManager {
Assert.notNull( headEntity, "Head entity cannot be null" );
Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
Assert.notNull( indexService, "indexService cannot be null" );
+ Assert.notNull( collectionService, "collectionService cannot be null" );
+ Assert.notNull( connectionService, "connectionService cannot be null" );
+
this.entityManagerFig = entityManagerFig;
// TODO: this assert should not be failing
@@ -149,7 +150,6 @@ public class CpRelationManager implements RelationManager {
this.managerCache = managerCache;
this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
- this.pipelineBuilderFactory = pipelineBuilderFactory;
this.collectionService = collectionService;
this.connectionService = connectionService;
@@ -616,6 +616,7 @@ public class CpRelationManager implements RelationManager {
query.setEntityType( collection.getType() );
final Query toExecute = adjustQuery( query );
+ final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
final Id ownerId = headEntity.asId();
//wire the callback so we can get each page
@@ -626,7 +627,7 @@ public class CpRelationManager implements RelationManager {
final CollectionSearch search =
new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
- toExecute.getQl(), cursor );
+ queryString, cursor );
return collectionService.searchCollection( search );
}
@@ -890,6 +891,8 @@ public class CpRelationManager implements RelationManager {
final Id sourceId = headEntity.asId();
+ final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
+
if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {
@@ -903,7 +906,7 @@ public class CpRelationManager implements RelationManager {
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
- toExecute.getQl(), cursor );
+ queryString, cursor );
return connectionService.searchConnectionAsRefs( search );
}
}.next();
@@ -918,7 +921,7 @@ public class CpRelationManager implements RelationManager {
//we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
- toExecute.getQl(), cursor );
+ queryString, cursor );
return connectionService.searchConnection( search );
}
}.next();