You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 20:44:25 UTC
[02/35] incubator-usergrid git commit: Second pass with visitor
factory generation
Second pass with visitor factory generation
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa31768c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa31768c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa31768c
Branch: refs/heads/ug2-doc-update
Commit: aa31768cb8f6826692e4bb06f3459e1c7af4827f
Parents: 9b21332
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jun 16 18:47:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jun 16 18:47:34 2015 -0600
----------------------------------------------------------------------
.../cassandra/RelationManagerImpl.java | 336 +------------------
.../cassandra/index/IndexBucketScanner.java | 4 +-
.../persistence/query/ir/SearchVisitor.java | 24 +-
.../result/CollectionSearchVisitorFactory.java | 47 ++-
.../result/ConnectionSearchVisitorFactory.java | 43 ++-
.../ir/result/SearchCollectionVisitor.java | 168 ++++++++++
.../ir/result/SearchConnectionVisitor.java | 213 ++++++++++++
7 files changed, 492 insertions(+), 343 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 456add3..0e5011f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -20,8 +20,6 @@ package org.apache.usergrid.persistence.cassandra;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -34,6 +32,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
+
import org.apache.usergrid.persistence.CollectionRef;
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
@@ -52,30 +51,17 @@ import org.apache.usergrid.persistence.SimpleCollectionRef;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
-import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
-import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
import org.apache.usergrid.persistence.entities.Group;
-import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
-import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
import org.apache.usergrid.persistence.geo.EntityLocationRef;
-import org.apache.usergrid.persistence.geo.model.Point;
import org.apache.usergrid.persistence.hector.CountingMutator;
-import org.apache.usergrid.persistence.query.ir.AllNode;
-import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
-import org.apache.usergrid.persistence.query.ir.QueryNode;
import org.apache.usergrid.persistence.query.ir.QuerySlice;
-import org.apache.usergrid.persistence.query.ir.SearchVisitor;
-import org.apache.usergrid.persistence.query.ir.WithinNode;
import org.apache.usergrid.persistence.query.ir.result.CollectionResultsLoaderFactory;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser;
import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory;
import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator;
-import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
-import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
-import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
-import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
+import org.apache.usergrid.persistence.query.ir.result.SearchCollectionVisitor;
+import org.apache.usergrid.persistence.query.ir.result.SearchConnectionVisitor;
import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.IndexUtils;
@@ -83,9 +69,6 @@ import org.apache.usergrid.utils.MapUtils;
import com.yammer.metrics.annotation.Metered;
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
@@ -94,9 +77,7 @@ import me.prettyprint.hector.api.mutation.Mutator;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static java.util.Arrays.asList;
-
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
@@ -1374,41 +1355,8 @@ public class RelationManagerImpl implements RelationManager {
}
- private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
-
- DynamicComposite[] range = slice.getRange();
- Object keyPrefix = key( indexKey, slice.getPropertyName() );
- IndexScanner scanner =
- new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.CONNECTION,
- keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), slice.getPropertyName() );
-
- return scanner;
- }
-
-
- /**
- * Search the collection index using all the buckets for the given collection
- *
- * @param indexKey The index key to read
- * @param slice Slice set in the query
- * @param collectionName The name of the collection to search
- * @param pageSize The page size to load when iterating
- */
- private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
- throws Exception {
-
- DynamicComposite[] range = slice.getRange();
-
- Object keyPrefix = key( indexKey, slice.getPropertyName() );
-
- IndexScanner scanner =
- new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.COLLECTION,
- keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), collectionName );
-
- return scanner;
- }
@SuppressWarnings("unchecked")
@@ -1772,7 +1720,7 @@ public class RelationManagerImpl implements RelationManager {
// we have something to search with, visit our tree and evaluate the
// results
QueryProcessor qp = new QueryProcessor( query, collection, em, factory );
- SearchCollectionVisitor visitor = new SearchCollectionVisitor( qp );
+ SearchCollectionVisitor visitor = new SearchCollectionVisitor( this, qp );
return qp.getResults( visitor );
}
@@ -1966,7 +1914,7 @@ public class RelationManagerImpl implements RelationManager {
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
QueryProcessor qp = new QueryProcessor( query, null, em, factory );
- SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
+ SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
return qp.getResults( visitor );
}
@@ -2008,7 +1956,7 @@ public class RelationManagerImpl implements RelationManager {
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
QueryProcessor qp = new QueryProcessor( query, null, em, factory );
- SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, false );
+ SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, false );
return qp.getResults( visitor );
}
@@ -2047,7 +1995,7 @@ public class RelationManagerImpl implements RelationManager {
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
QueryProcessor qp = new QueryProcessor( query, null, em, factory );
- SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
+ SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
return qp.getResults( visitor );
}
@@ -2059,274 +2007,4 @@ public class RelationManagerImpl implements RelationManager {
}
- private static final UUIDIndexSliceParser UUID_PARSER = new UUIDIndexSliceParser();
-
-
- /**
- * Simple search visitor that performs all the joining
- *
- * @author tnine
- */
- private class SearchCollectionVisitor extends SearchVisitor {
-
- private final CollectionInfo collection;
-
-
- /**
- * @param queryProcessor
- */
- public SearchCollectionVisitor( QueryProcessor queryProcessor ) {
- super( queryProcessor );
- this.collection = queryProcessor.getCollectionInfo();
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence.query.ir
- * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
- */
- @Override
- protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
- // NOTE we explicitly do not append the slice value here. This
- // is done in the searchIndex method below
- Object indexKey = key( headEntity.getUuid(), collection.getName() );
-
- // update the cursor and order before we perform the slice
- // operation. Should be done after subkeying since this can
- // change the hash value of the slice
- queryProcessor.applyCursorAndSort( slice );
-
- IndexScanner columns = null;
-
- // nothing left to search for this range
- if ( slice.isComplete() ) {
- columns = new NoOpIndexScanner();
- }
- // perform the search
- else {
- columns = searchIndexBuckets( indexKey, slice, collection.getName(),
- queryProcessor.getPageSizeHint( node ) );
- }
-
- return columns;
- }
-
-
- public void visit( AllNode node ) throws Exception {
-
- String collectionName = collection.getName();
-
- QuerySlice slice = node.getSlice();
-
- queryProcessor.applyCursorAndSort( slice );
-
- UUID startId = null;
-
- if ( slice.hasCursor() ) {
- startId = UUID_PARSER.parse( slice.getCursor() ).getUUID();
- }
-
-
- IndexScanner indexScanner = cass.getIdList( cass.getApplicationKeyspace( applicationId ),
- key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
- queryProcessor.getPageSizeHint( node ), query.isReversed(), indexBucketLocator, applicationId,
- collectionName, node.isForceKeepFirst() );
-
- this.results.push( new SliceIterator( slice, indexScanner, UUID_PARSER ) );
- }
-
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
- * persistence.query.ir.WithinNode)
- */
- @Override
- public void visit( WithinNode node ) throws Exception {
-
- QuerySlice slice = node.getSlice();
-
- queryProcessor.applyCursorAndSort( slice );
-
- GeoIterator itr = new GeoIterator(
- new CollectionGeoSearch( em, indexBucketLocator, cass, headEntity, collection.getName() ),
- query.getLimit(), slice, node.getPropertyName(),
- new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
-
- results.push( itr );
- }
-
-
- @Override
- public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
- EntityRef ref = em.getAlias( headEntity.getUuid(), collection.getType(), nameIdentifierNode.getName() );
-
- if ( ref == null ) {
- this.results.push( new EmptyIterator() );
- return;
- }
-
- this.results.push( new StaticIdIterator( ref.getUuid() ) );
- }
- }
-
-
- /**
- * Simple search visitor that performs all the joining
- *
- * @author tnine
- */
- private class SearchConnectionVisitor extends SearchVisitor {
-
- private final ConnectionRefImpl connection;
-
- /** True if we should search from source->target edges. False if we should search from target<-source edges */
- private final boolean outgoing;
-
-
- /**
- * @param queryProcessor They query processor to use
- * @param connection The connection refernce
- * @param outgoing The direction to search. True if we should search from source->target edges. False if we
- * should search from target<-source edges
- */
- public SearchConnectionVisitor( QueryProcessor queryProcessor, ConnectionRefImpl connection,
- boolean outgoing ) {
- super( queryProcessor );
- this.connection = connection;
- this.outgoing = outgoing;
- }
-
-
- /* (non-Javadoc)
- * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence.query.ir
- * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
- */
- @Override
- protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
-
- UUID id = ConnectionRefImpl.getIndexId( ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE, headEntity,
- connection.getConnectionType(), connection.getConnectedEntityType(), new ConnectedEntityRef[0] );
-
- Object key = key( id, INDEX_CONNECTIONS );
-
- // update the cursor and order before we perform the slice
- // operation
- queryProcessor.applyCursorAndSort( slice );
-
- IndexScanner columns = null;
-
- if ( slice.isComplete() ) {
- columns = new NoOpIndexScanner();
- }
- else {
- columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ) );
- }
-
- return columns;
- }
-
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
- * persistence.query.ir.WithinNode)
- */
- @Override
- public void visit( WithinNode node ) throws Exception {
-
- QuerySlice slice = node.getSlice();
-
- queryProcessor.applyCursorAndSort( slice );
-
- GeoIterator itr =
- new GeoIterator( new ConnectionGeoSearch( em, indexBucketLocator, cass, connection.getIndexId() ),
- query.getLimit(), slice, node.getPropertyName(),
- new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
-
- results.push( itr );
- }
-
-
- @Override
- public void visit( AllNode node ) throws Exception {
- QuerySlice slice = node.getSlice();
-
- queryProcessor.applyCursorAndSort( slice );
-
- int size = queryProcessor.getPageSizeHint( node );
-
- ByteBuffer start = null;
-
- if ( slice.hasCursor() ) {
- start = slice.getCursor();
- }
-
-
- boolean skipFirst = node.isForceKeepFirst() ? false : slice.hasCursor();
-
- UUID entityIdToUse;
-
- //change our type depending on which direction we're loading
- String dictionaryType;
-
- //the target type
- String targetType;
-
- //this is on the "source" side of the edge
- if ( outgoing ) {
- entityIdToUse = connection.getConnectingEntityId();
- dictionaryType = DICTIONARY_CONNECTED_ENTITIES;
- targetType = connection.getConnectedEntityType();
- }
-
- //we're on the target side of the edge
- else {
- entityIdToUse = connection.getConnectedEntityId();
- dictionaryType = DICTIONARY_CONNECTING_ENTITIES;
- targetType = connection.getConnectingEntityType();
- }
-
- final String connectionType = connection.getConnectionType();
-
-
- final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType );
-
-
- final Iterator<String> connectionTypes;
-
- //use the provided connection type
- if ( connectionType != null ) {
- connectionTypes = Collections.singleton( connectionType ).iterator();
- }
-
- //we need to iterate all connection types
- else {
- connectionTypes = new ConnectionTypesIterator( cass, applicationId, entityIdToUse, outgoing, size );
- }
-
- IndexScanner connectionScanner =
- new ConnectedIndexScanner( cass, dictionaryType, applicationId, entityIdToUse, connectionTypes,
- start, slice.isReversed(), size, skipFirst );
-
- this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
- }
-
-
- @Override
- public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
- //TODO T.N. USERGRID-1919 actually validate this is connected
- EntityRef ref =
- em.getAlias( applicationId, connection.getConnectedEntityType(), nameIdentifierNode.getName() );
-
- if ( ref == null ) {
- this.results.push( new EmptyIterator() );
- return;
- }
-
- this.results.push( new StaticIdIterator( ref.getUuid() ) );
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 8a9b709..668ab02 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -71,8 +71,8 @@ public class IndexBucketScanner implements IndexScanner {
public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily,
- UUID applicationId, Object keyPrefix, Object start, Object finish,
- boolean reversed, int pageSize, boolean skipFirst, String bucket) {
+ UUID applicationId, Object keyPrefix, String bucket, Object start, Object finish,
+ boolean reversed, int pageSize, boolean skipFirst) {
this.cass = cass;
this.applicationId = applicationId;
this.keyPrefix = keyPrefix;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index 0038689..0512cb2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -18,10 +18,13 @@ package org.apache.usergrid.persistence.query.ir;
import java.util.Stack;
+import java.util.UUID;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.cassandra.QueryProcessor;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
@@ -58,11 +61,27 @@ public abstract class SearchVisitor implements NodeVisitor {
protected final String bucket;
+ protected final EntityRef headEntity;
+ protected final CassandraService cassandraService;
+ protected final IndexBucketLocator indexBucketLocator;
+ protected final UUID applicationId;
+
/**
+ * @param cassandraService
+ * @param indexBucketLocator
+ * @param applicationId
+ * @param headEntity
* @param queryProcessor
*/
- public SearchVisitor( QueryProcessor queryProcessor, final String bucket ) {
+ public SearchVisitor( final CassandraService cassandraService, final IndexBucketLocator indexBucketLocator,
+ final UUID applicationId, final EntityRef headEntity, QueryProcessor queryProcessor, final String bucket ) {
+
+
+ this.cassandraService = cassandraService;
+ this.indexBucketLocator = indexBucketLocator;
+ this.applicationId = applicationId;
+ this.headEntity = headEntity;
this.query = queryProcessor.getQuery();
this.queryProcessor = queryProcessor;
this.em = queryProcessor.getEntityManager();
@@ -70,6 +89,9 @@ public abstract class SearchVisitor implements NodeVisitor {
}
+
+
+
/** Return the results if they exist, null otherwise */
public ResultIterator getResults() {
return results.isEmpty() ? null : results.pop();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
index 1fd1604..1637f2d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
@@ -17,19 +17,62 @@
package org.apache.usergrid.persistence.query.ir.result;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.cassandra.QueryProcessor;
import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+/**
+ * Creates collection visitors per shard
+ */
public class CollectionSearchVisitorFactory implements SearchVisitorFactory {
+ private final CassandraService cassandraService;
+ private final IndexBucketLocator indexBucketLocator;
+ private final QueryProcessor queryProcessor;
+ private final UUID applicationId;
+ private final EntityRef headEntity;
+ private final String collectionName;
+
+
+ public CollectionSearchVisitorFactory( final CassandraService cassandraService,
+ final IndexBucketLocator indexBucketLocator,
+ final QueryProcessor queryProcessor, final UUID applicationId,
+ final EntityRef headEntity, final String collectionName ) {
+ this.cassandraService = cassandraService;
+ this.indexBucketLocator = indexBucketLocator;
+ this.queryProcessor = queryProcessor;
+ this.applicationId = applicationId;
+ this.headEntity = headEntity;
+ this.collectionName = collectionName;
+ }
+
@Override
public Collection<SearchVisitor> createVisitors() {
-// SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
- return null;
+ final List<String> buckets =
+ indexBucketLocator.getBuckets( applicationId, IndexBucketLocator.IndexType.CONNECTION, collectionName );
+
+
+ final List<SearchVisitor> visitors = new ArrayList<SearchVisitor>( buckets.size() );
+
+ for ( final String bucket : buckets ) {
+
+ final SearchVisitor searchVisitor =
+ new SearchCollectionVisitor( cassandraService, indexBucketLocator, queryProcessor, applicationId,
+ headEntity, bucket );
+ visitors.add( searchVisitor );
+ }
+
+
+ return visitors;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
index 371f79b..c1ec724 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
@@ -17,40 +17,65 @@
package org.apache.usergrid.persistence.query.ir.result;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
-import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.cassandra.QueryProcessor;
import org.apache.usergrid.persistence.query.ir.SearchVisitor;
public class ConnectionSearchVisitorFactory implements SearchVisitorFactory {
- private final
+ private final CassandraService cassandraService;
private final IndexBucketLocator indexBucketLocator;
private final QueryProcessor queryProcessor;
- private final ConnectionRef connectionRef;
+ private final UUID applicationId;
+ private final EntityRef headEntity;
+ private final ConnectionRefImpl connectionRef;
private final boolean outgoing;
+ private final String[] prefix;
- private ConnectionSearchVisitorFactory( final IndexBucketLocator indexBucketLocator,
- final QueryProcessor queryProcessor, final ConnectionRef connectionRef,
- final boolean outgoing ){
-// SearchConnectionVisitor visitor = new SearchConnectionVisitor( indexBucketLocator, qp, connectionRef, true );
+ private ConnectionSearchVisitorFactory( final CassandraService cassandraService,
+ final IndexBucketLocator indexBucketLocator,
+ final QueryProcessor queryProcessor, final UUID applicationId,
+ final EntityRef headEntity, ConnectionRefImpl connectionRef,
+ boolean outgoing, final String... prefix ) {
+ this.applicationId = applicationId;
+ this.cassandraService = cassandraService;
this.indexBucketLocator = indexBucketLocator;
this.queryProcessor = queryProcessor;
+ this.headEntity = headEntity;
this.connectionRef = connectionRef;
this.outgoing = outgoing;
+ this.prefix = prefix;
}
@Override
public Collection<SearchVisitor> createVisitors() {
- indexBucketLocator.getBuckets( )
+ final List<String> buckets =
+ indexBucketLocator.getBuckets( applicationId, IndexBucketLocator.IndexType.CONNECTION, prefix );
- return null;
+ final List<SearchVisitor> visitors = new ArrayList<SearchVisitor>( buckets.size() );
+
+ for ( final String bucket : buckets ) {
+
+ final SearchVisitor searchVisitor =
+ new SearchConnectionVisitor( cassandraService, indexBucketLocator, queryProcessor, applicationId,
+ headEntity, connectionRef, outgoing, bucket );
+ visitors.add( searchVisitor );
+ }
+
+
+ return visitors;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
new file mode 100644
index 0000000..61315e4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
@@ -0,0 +1,168 @@
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
+import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+
+
+/**
+ * Simple search visitor that performs all the joining
+ *
+ * @author tnine
+ */
+public class SearchCollectionVisitor extends SearchVisitor {
+
+
+ private static final UUIDIndexSliceParser UUID_PARSER = new UUIDIndexSliceParser();
+
+
+ private final CollectionInfo collection;
+
+
+ /**
+ * @param queryProcessor
+ */
+ public SearchCollectionVisitor( final CassandraService cassandraService,
+ final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
+ final UUID applicationId, final EntityRef headEntity, final String bucket ) {
+ super( cassandraService, indexBucketLocator, applicationId, headEntity, queryProcessor, bucket );
+ this.collection = queryProcessor.getCollectionInfo();
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence
+ * .query.ir
+ * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
+ */
+ @Override
+ protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
+ // NOTE we explicitly do not append the slice value here. This
+ // is done in the searchIndex method below
+ Object indexKey = key( headEntity.getUuid(), collection.getName() );
+
+ // update the cursor and order before we perform the slice
+ // operation. Should be done after subkeying since this can
+ // change the hash value of the slice
+ queryProcessor.applyCursorAndSort( slice );
+
+ IndexScanner columns = null;
+
+ // nothing left to search for this range
+ if ( slice.isComplete() ) {
+ columns = new NoOpIndexScanner();
+ }
+ // perform the search
+ else {
+ columns =
+ searchIndexBuckets( indexKey, slice, collection.getName(), queryProcessor.getPageSizeHint( node ) );
+ }
+
+ return columns;
+ }
+
+
+ public void visit( AllNode node ) throws Exception {
+
+ String collectionName = collection.getName();
+
+ QuerySlice slice = node.getSlice();
+
+ queryProcessor.applyCursorAndSort( slice );
+
+ UUID startId = null;
+
+ if ( slice.hasCursor() ) {
+ startId = UUID_PARSER.parse( slice.getCursor() ).getUUID();
+ }
+
+
+ IndexScanner indexScanner = cassandraService
+ .getIdList( cassandraService.getApplicationKeyspace( applicationId ),
+ key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
+ queryProcessor.getPageSizeHint( node ), query.isReversed(), indexBucketLocator, applicationId,
+ collectionName, node.isForceKeepFirst() );
+
+ this.results.push( new SliceIterator( slice, indexScanner, UUID_PARSER ) );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.WithinNode)
+ */
+ @Override
+ public void visit( WithinNode node ) throws Exception {
+
+ QuerySlice slice = node.getSlice();
+
+ queryProcessor.applyCursorAndSort( slice );
+
+ GeoIterator itr = new GeoIterator(
+ new CollectionGeoSearch( em, indexBucketLocator, cassandraService, headEntity, collection.getName() ),
+ query.getLimit(), slice, node.getPropertyName(), new Point( node.getLattitude(), node.getLongitude() ),
+ node.getDistance() );
+
+ results.push( itr );
+ }
+
+
+ @Override
+ public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
+ EntityRef ref = em.getAlias( headEntity.getUuid(), collection.getType(), nameIdentifierNode.getName() );
+
+ if ( ref == null ) {
+ this.results.push( new EmptyIterator() );
+ return;
+ }
+
+ this.results.push( new StaticIdIterator( ref.getUuid() ) );
+ }
+
+
+ /**
+ * Search the collection index using all the buckets for the given collection
+ *
+ * @param indexKey The index key to read
+ * @param slice Slice set in the query
+ * @param collectionName The name of the collection to search
+ * @param pageSize The page size to load when iterating
+ */
+ private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
+ throws Exception {
+
+ DynamicComposite[] range = slice.getRange();
+
+ Object keyPrefix = key( indexKey, slice.getPropertyName() );
+
+ IndexScanner scanner =
+ new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId, keyPrefix, bucket, range[0],
+ range[1], slice.isReversed(), pageSize, slice.hasCursor() );
+
+ return scanner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
new file mode 100644
index 0000000..da99cdf
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -0,0 +1,213 @@
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
+import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+
+
+/**
+ * Simple search visitor that performs all the joining
+ *
+ * @author tnine
+ */
+public class SearchConnectionVisitor extends SearchVisitor {
+
+ private final ConnectionRefImpl connection;
+
+ /** True if we should search from source->target edges. False if we should search from target<-source edges */
+ private final boolean outgoing;
+
+
+
+ /**
+ * @param queryProcessor They query processor to use
+ * @param applicationId
+ * @param connection The connection refernce
+ * @param outgoing The direction to search. True if we should search from source->target edges. False if we
+ */
+ public SearchConnectionVisitor( final CassandraService cassandraService,
+ final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
+ final UUID applicationId, final EntityRef headEntity, ConnectionRefImpl connection,
+ boolean outgoing, final String bucket ) {
+ super( cassandraService, indexBucketLocator, applicationId, headEntity, queryProcessor, bucket );
+ this.connection = connection;
+ this.outgoing = outgoing;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence
+ * .query.ir
+ * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
+ */
+ @Override
+ protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
+
+ UUID
+ id = ConnectionRefImpl.getIndexId( ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE, headEntity,
+ connection.getConnectionType(), connection.getConnectedEntityType(), new ConnectedEntityRef[0] );
+
+ Object key = key( id, INDEX_CONNECTIONS );
+
+ // update the cursor and order before we perform the slice
+ // operation
+ queryProcessor.applyCursorAndSort( slice );
+
+ IndexScanner columns = null;
+
+ if ( slice.isComplete() ) {
+ columns = new NoOpIndexScanner();
+ }
+ else {
+ columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ), bucket );
+ }
+
+ return columns;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.WithinNode)
+ */
+ @Override
+ public void visit( WithinNode node ) throws Exception {
+
+ QuerySlice slice = node.getSlice();
+
+ queryProcessor.applyCursorAndSort( slice );
+
+ GeoIterator itr =
+ new GeoIterator( new ConnectionGeoSearch( em, indexBucketLocator, cassandraService, connection.getIndexId() ),
+ query.getLimit(), slice, node.getPropertyName(),
+ new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
+
+ results.push( itr );
+ }
+
+
+ @Override
+ public void visit( AllNode node ) throws Exception {
+ QuerySlice slice = node.getSlice();
+
+ queryProcessor.applyCursorAndSort( slice );
+
+ int size = queryProcessor.getPageSizeHint( node );
+
+ ByteBuffer start = null;
+
+ if ( slice.hasCursor() ) {
+ start = slice.getCursor();
+ }
+
+
+ boolean skipFirst = node.isForceKeepFirst() ? false : slice.hasCursor();
+
+ UUID entityIdToUse;
+
+ //change our type depending on which direction we're loading
+ String dictionaryType;
+
+ //the target type
+ String targetType;
+
+ //this is on the "source" side of the edge
+ if ( outgoing ) {
+ entityIdToUse = connection.getConnectingEntityId();
+ dictionaryType = DICTIONARY_CONNECTED_ENTITIES;
+ targetType = connection.getConnectedEntityType();
+ }
+
+ //we're on the target side of the edge
+ else {
+ entityIdToUse = connection.getConnectedEntityId();
+ dictionaryType = DICTIONARY_CONNECTING_ENTITIES;
+ targetType = connection.getConnectingEntityType();
+ }
+
+ final String connectionType = connection.getConnectionType();
+
+
+ final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType );
+
+
+ final Iterator<String> connectionTypes;
+
+ //use the provided connection type
+ if ( connectionType != null ) {
+ connectionTypes = Collections.singleton( connectionType ).iterator();
+ }
+
+ //we need to iterate all connection types
+ else {
+ connectionTypes = new ConnectionTypesIterator( cassandraService, applicationId, entityIdToUse, outgoing, size );
+ }
+
+ IndexScanner connectionScanner =
+ new ConnectedIndexScanner( cassandraService, dictionaryType, applicationId, entityIdToUse, connectionTypes,
+ start, slice.isReversed(), size, skipFirst );
+
+ this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
+ }
+
+
+ @Override
+ public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
+ //TODO T.N. USERGRID-1919 actually validate this is connected
+ EntityRef ref =
+ em.getAlias( applicationId, connection.getConnectedEntityType(), nameIdentifierNode.getName() );
+
+ if ( ref == null ) {
+ this.results.push( new EmptyIterator() );
+ return;
+ }
+
+ this.results.push( new StaticIdIterator( ref.getUuid() ) );
+ }
+
+ private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize, final String shardBucket ) throws Exception {
+
+ DynamicComposite[] range = slice.getRange();
+
+ Object keyPrefix = key( indexKey, slice.getPropertyName() );
+
+
+
+ IndexScanner scanner =
+ new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId,
+ keyPrefix, shardBucket, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor());
+
+ return scanner;
+ }
+}