You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 20:44:25 UTC

[02/35] incubator-usergrid git commit: Second pass with visitor factory generation

Second pass with visitor factory generation


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa31768c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa31768c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa31768c

Branch: refs/heads/ug2-doc-update
Commit: aa31768cb8f6826692e4bb06f3459e1c7af4827f
Parents: 9b21332
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jun 16 18:47:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jun 16 18:47:34 2015 -0600

----------------------------------------------------------------------
 .../cassandra/RelationManagerImpl.java          | 336 +------------------
 .../cassandra/index/IndexBucketScanner.java     |   4 +-
 .../persistence/query/ir/SearchVisitor.java     |  24 +-
 .../result/CollectionSearchVisitorFactory.java  |  47 ++-
 .../result/ConnectionSearchVisitorFactory.java  |  43 ++-
 .../ir/result/SearchCollectionVisitor.java      | 168 ++++++++++
 .../ir/result/SearchConnectionVisitor.java      | 213 ++++++++++++
 7 files changed, 492 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 456add3..0e5011f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -20,8 +20,6 @@ package org.apache.usergrid.persistence.cassandra;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +32,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
+
 import org.apache.usergrid.persistence.CollectionRef;
 import org.apache.usergrid.persistence.ConnectedEntityRef;
 import org.apache.usergrid.persistence.ConnectionRef;
@@ -52,30 +51,17 @@ import org.apache.usergrid.persistence.SimpleCollectionRef;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.SimpleRoleRef;
 import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
-import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
-import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
 import org.apache.usergrid.persistence.entities.Group;
-import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
-import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
 import org.apache.usergrid.persistence.geo.EntityLocationRef;
-import org.apache.usergrid.persistence.geo.model.Point;
 import org.apache.usergrid.persistence.hector.CountingMutator;
-import org.apache.usergrid.persistence.query.ir.AllNode;
-import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
-import org.apache.usergrid.persistence.query.ir.QueryNode;
 import org.apache.usergrid.persistence.query.ir.QuerySlice;
-import org.apache.usergrid.persistence.query.ir.SearchVisitor;
-import org.apache.usergrid.persistence.query.ir.WithinNode;
 import org.apache.usergrid.persistence.query.ir.result.CollectionResultsLoaderFactory;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser;
 import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory;
 import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator;
-import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
-import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
-import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
-import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
+import org.apache.usergrid.persistence.query.ir.result.SearchCollectionVisitor;
+import org.apache.usergrid.persistence.query.ir.result.SearchConnectionVisitor;
 import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.IndexUtils;
@@ -83,9 +69,6 @@ import org.apache.usergrid.utils.MapUtils;
 
 import com.yammer.metrics.annotation.Metered;
 
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.DynamicComposite;
 import me.prettyprint.hector.api.beans.HColumn;
@@ -94,9 +77,7 @@ import me.prettyprint.hector.api.mutation.Mutator;
 import static java.lang.String.CASE_INSENSITIVE_ORDER;
 import static java.util.Arrays.asList;
 
-
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
@@ -1374,41 +1355,8 @@ public class RelationManagerImpl implements RelationManager {
     }
 
 
-    private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
-
-        DynamicComposite[] range = slice.getRange();
 
-        Object keyPrefix = key( indexKey, slice.getPropertyName() );
 
-        IndexScanner scanner =
-                new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.CONNECTION,
-                        keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), slice.getPropertyName() );
-
-        return scanner;
-    }
-
-
-    /**
-     * Search the collection index using all the buckets for the given collection
-     *
-     * @param indexKey The index key to read
-     * @param slice Slice set in the query
-     * @param collectionName The name of the collection to search
-     * @param pageSize The page size to load when iterating
-     */
-    private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
-            throws Exception {
-
-        DynamicComposite[] range = slice.getRange();
-
-        Object keyPrefix = key( indexKey, slice.getPropertyName() );
-
-        IndexScanner scanner =
-                new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.COLLECTION,
-                        keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), collectionName );
-
-        return scanner;
-    }
 
 
     @SuppressWarnings("unchecked")
@@ -1772,7 +1720,7 @@ public class RelationManagerImpl implements RelationManager {
         // we have something to search with, visit our tree and evaluate the
         // results
         QueryProcessor qp = new QueryProcessor( query, collection, em, factory );
-        SearchCollectionVisitor visitor = new SearchCollectionVisitor( qp );
+        SearchCollectionVisitor visitor = new SearchCollectionVisitor( this, qp );
 
         return qp.getResults( visitor );
     }
@@ -1966,7 +1914,7 @@ public class RelationManagerImpl implements RelationManager {
         final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
 
         QueryProcessor qp = new QueryProcessor( query, null, em, factory );
-        SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
+        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
 
         return qp.getResults( visitor );
     }
@@ -2008,7 +1956,7 @@ public class RelationManagerImpl implements RelationManager {
         final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
 
         QueryProcessor qp = new QueryProcessor( query, null, em, factory );
-        SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, false );
+        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, false );
 
         return qp.getResults( visitor );
 	}
@@ -2047,7 +1995,7 @@ public class RelationManagerImpl implements RelationManager {
         final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
 
         QueryProcessor qp = new QueryProcessor( query, null, em, factory );
-        SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
+        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
 
         return qp.getResults( visitor );
     }
@@ -2059,274 +2007,4 @@ public class RelationManagerImpl implements RelationManager {
     }
 
 
-    private static final UUIDIndexSliceParser UUID_PARSER = new UUIDIndexSliceParser();
-
-
-    /**
-     * Simple search visitor that performs all the joining
-     *
-     * @author tnine
-     */
-    private class SearchCollectionVisitor extends SearchVisitor {
-
-        private final CollectionInfo collection;
-
-
-        /**
-         * @param queryProcessor
-         */
-        public SearchCollectionVisitor( QueryProcessor queryProcessor ) {
-            super( queryProcessor );
-            this.collection = queryProcessor.getCollectionInfo();
-        }
-
-
-        /* (non-Javadoc)
-     * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence.query.ir
-     * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
-     */
-        @Override
-        protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
-            // NOTE we explicitly do not append the slice value here. This
-            // is done in the searchIndex method below
-            Object indexKey = key( headEntity.getUuid(), collection.getName() );
-
-            // update the cursor and order before we perform the slice
-            // operation. Should be done after subkeying since this can
-            // change the hash value of the slice
-            queryProcessor.applyCursorAndSort( slice );
-
-            IndexScanner columns = null;
-
-            // nothing left to search for this range
-            if ( slice.isComplete() ) {
-                columns = new NoOpIndexScanner();
-            }
-            // perform the search
-            else {
-                columns = searchIndexBuckets( indexKey, slice, collection.getName(),
-                        queryProcessor.getPageSizeHint( node ) );
-            }
-
-            return columns;
-        }
-
-
-        public void visit( AllNode node ) throws Exception {
-
-            String collectionName = collection.getName();
-
-            QuerySlice slice = node.getSlice();
-
-            queryProcessor.applyCursorAndSort( slice );
-
-            UUID startId = null;
-
-            if ( slice.hasCursor() ) {
-                startId = UUID_PARSER.parse( slice.getCursor() ).getUUID();
-            }
-
-
-            IndexScanner indexScanner = cass.getIdList( cass.getApplicationKeyspace( applicationId ),
-                    key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
-                    queryProcessor.getPageSizeHint( node ), query.isReversed(), indexBucketLocator, applicationId,
-                    collectionName, node.isForceKeepFirst() );
-
-            this.results.push( new SliceIterator( slice, indexScanner, UUID_PARSER ) );
-        }
-
-
-        /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
-     * persistence.query.ir.WithinNode)
-     */
-        @Override
-        public void visit( WithinNode node ) throws Exception {
-
-            QuerySlice slice = node.getSlice();
-
-            queryProcessor.applyCursorAndSort( slice );
-
-            GeoIterator itr = new GeoIterator(
-                    new CollectionGeoSearch( em, indexBucketLocator, cass, headEntity, collection.getName() ),
-                    query.getLimit(), slice, node.getPropertyName(),
-                    new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
-
-            results.push( itr );
-        }
-
-
-        @Override
-        public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
-            EntityRef ref = em.getAlias( headEntity.getUuid(), collection.getType(), nameIdentifierNode.getName() );
-
-            if ( ref == null ) {
-                this.results.push( new EmptyIterator() );
-                return;
-            }
-
-            this.results.push( new StaticIdIterator( ref.getUuid() ) );
-        }
-    }
-
-
-    /**
-     * Simple search visitor that performs all the joining
-     *
-     * @author tnine
-     */
-    private class SearchConnectionVisitor extends SearchVisitor {
-
-        private final ConnectionRefImpl connection;
-
-        /** True if we should search from source->target edges.  False if we should search from target<-source edges */
-        private final boolean outgoing;
-
-
-        /**
-         * @param queryProcessor They query processor to use
-         * @param connection The connection refernce
-         * @param outgoing The direction to search.  True if we should search from source->target edges.  False if we
-         * should search from target<-source edges
-         */
-        public SearchConnectionVisitor( QueryProcessor queryProcessor, ConnectionRefImpl connection,
-                                        boolean outgoing ) {
-            super( queryProcessor );
-            this.connection = connection;
-            this.outgoing = outgoing;
-        }
-
-
-        /* (non-Javadoc)
-     * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence.query.ir
-     * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
-     */
-        @Override
-        protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
-
-            UUID id = ConnectionRefImpl.getIndexId( ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE, headEntity,
-                    connection.getConnectionType(), connection.getConnectedEntityType(), new ConnectedEntityRef[0] );
-
-            Object key = key( id, INDEX_CONNECTIONS );
-
-            // update the cursor and order before we perform the slice
-            // operation
-            queryProcessor.applyCursorAndSort( slice );
-
-            IndexScanner columns = null;
-
-            if ( slice.isComplete() ) {
-                columns = new NoOpIndexScanner();
-            }
-            else {
-                columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ) );
-            }
-
-            return columns;
-        }
-
-
-        /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
-     * persistence.query.ir.WithinNode)
-     */
-        @Override
-        public void visit( WithinNode node ) throws Exception {
-
-            QuerySlice slice = node.getSlice();
-
-            queryProcessor.applyCursorAndSort( slice );
-
-            GeoIterator itr =
-                    new GeoIterator( new ConnectionGeoSearch( em, indexBucketLocator, cass, connection.getIndexId() ),
-                            query.getLimit(), slice, node.getPropertyName(),
-                            new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
-
-            results.push( itr );
-        }
-
-
-        @Override
-        public void visit( AllNode node ) throws Exception {
-            QuerySlice slice = node.getSlice();
-
-            queryProcessor.applyCursorAndSort( slice );
-
-            int size = queryProcessor.getPageSizeHint( node );
-
-            ByteBuffer start = null;
-
-            if ( slice.hasCursor() ) {
-                start = slice.getCursor();
-            }
-
-
-            boolean skipFirst = node.isForceKeepFirst() ? false : slice.hasCursor();
-
-            UUID entityIdToUse;
-
-            //change our type depending on which direction we're loading
-            String dictionaryType;
-
-            //the target type
-            String targetType;
-
-            //this is on the "source" side of the edge
-            if ( outgoing ) {
-                entityIdToUse = connection.getConnectingEntityId();
-                dictionaryType = DICTIONARY_CONNECTED_ENTITIES;
-                targetType = connection.getConnectedEntityType();
-            }
-
-            //we're on the target side of the edge
-            else {
-                entityIdToUse = connection.getConnectedEntityId();
-                dictionaryType = DICTIONARY_CONNECTING_ENTITIES;
-                targetType = connection.getConnectingEntityType();
-            }
-
-            final String connectionType = connection.getConnectionType();
-
-
-            final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType );
-
-
-            final Iterator<String> connectionTypes;
-
-            //use the provided connection type
-            if ( connectionType != null ) {
-                connectionTypes = Collections.singleton( connectionType ).iterator();
-            }
-
-            //we need to iterate all connection types
-            else {
-                connectionTypes = new ConnectionTypesIterator( cass, applicationId, entityIdToUse, outgoing, size );
-            }
-
-            IndexScanner connectionScanner =
-                    new ConnectedIndexScanner( cass, dictionaryType, applicationId, entityIdToUse, connectionTypes,
-                            start, slice.isReversed(), size, skipFirst );
-
-            this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
-        }
-
-
-        @Override
-        public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
-            //TODO T.N. USERGRID-1919 actually validate this is connected
-            EntityRef ref =
-                    em.getAlias( applicationId, connection.getConnectedEntityType(), nameIdentifierNode.getName() );
-
-            if ( ref == null ) {
-                this.results.push( new EmptyIterator() );
-                return;
-            }
-
-            this.results.push( new StaticIdIterator( ref.getUuid() ) );
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 8a9b709..668ab02 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -71,8 +71,8 @@ public class IndexBucketScanner implements IndexScanner {
 
 
     public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily,
-                               UUID applicationId, Object keyPrefix, Object start, Object finish,
-                               boolean reversed, int pageSize, boolean skipFirst, String bucket) {
+                               UUID applicationId, Object keyPrefix, String bucket,  Object start, Object finish,
+                               boolean reversed, int pageSize, boolean skipFirst) {
         this.cass = cass;
         this.applicationId = applicationId;
         this.keyPrefix = keyPrefix;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index 0038689..0512cb2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -18,10 +18,13 @@ package org.apache.usergrid.persistence.query.ir;
 
 
 import java.util.Stack;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
 import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
@@ -58,11 +61,27 @@ public abstract class SearchVisitor implements NodeVisitor {
 
     protected final String bucket;
 
+    protected final EntityRef headEntity;
+    protected final CassandraService cassandraService;
+    protected final IndexBucketLocator indexBucketLocator;
+    protected final UUID applicationId;
+
 
     /**
+     * @param cassandraService
+     * @param indexBucketLocator
+     * @param applicationId
+     * @param headEntity
      * @param queryProcessor
      */
-    public SearchVisitor( QueryProcessor queryProcessor, final String bucket ) {
+    public SearchVisitor( final CassandraService cassandraService, final IndexBucketLocator indexBucketLocator,
+                          final UUID applicationId, final EntityRef headEntity, QueryProcessor queryProcessor, final String bucket ) {
+
+
+        this.cassandraService = cassandraService;
+        this.indexBucketLocator = indexBucketLocator;
+        this.applicationId = applicationId;
+        this.headEntity = headEntity;
         this.query = queryProcessor.getQuery();
         this.queryProcessor = queryProcessor;
         this.em = queryProcessor.getEntityManager();
@@ -70,6 +89,9 @@ public abstract class SearchVisitor implements NodeVisitor {
     }
 
 
+
+
+
     /** Return the results if they exist, null otherwise */
     public ResultIterator getResults() {
         return results.isEmpty() ? null : results.pop();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
index 1fd1604..1637f2d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
@@ -17,19 +17,62 @@
 package org.apache.usergrid.persistence.query.ir.result;
 
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
 
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
 import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 
 
+/**
+ * Creates collection visitors per shard
+ */
 public class CollectionSearchVisitorFactory implements SearchVisitorFactory {
 
+    private final CassandraService cassandraService;
+    private final IndexBucketLocator indexBucketLocator;
+    private final QueryProcessor queryProcessor;
+    private final UUID applicationId;
+    private final EntityRef headEntity;
+    private final String collectionName;
+
+
+    public CollectionSearchVisitorFactory( final CassandraService cassandraService,
+                                           final IndexBucketLocator indexBucketLocator,
+                                           final QueryProcessor queryProcessor, final UUID applicationId,
+                                           final EntityRef headEntity, final String collectionName ) {
+        this.cassandraService = cassandraService;
+        this.indexBucketLocator = indexBucketLocator;
+        this.queryProcessor = queryProcessor;
+        this.applicationId = applicationId;
+        this.headEntity = headEntity;
+        this.collectionName = collectionName;
+    }
+
 
     @Override
     public Collection<SearchVisitor> createVisitors() {
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
 
-        return null;
+        final List<String> buckets =
+                indexBucketLocator.getBuckets( applicationId, IndexBucketLocator.IndexType.CONNECTION, collectionName );
+
+
+        final List<SearchVisitor> visitors = new ArrayList<SearchVisitor>( buckets.size() );
+
+        for ( final String bucket : buckets ) {
+
+            final SearchVisitor searchVisitor =
+                    new SearchCollectionVisitor( cassandraService, indexBucketLocator, queryProcessor, applicationId,
+                            headEntity, bucket );
+            visitors.add( searchVisitor );
+        }
+
+
+        return visitors;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
index 371f79b..c1ec724 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
@@ -17,40 +17,65 @@
 package org.apache.usergrid.persistence.query.ir.result;
 
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
 
-import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
 import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 
 
 public class ConnectionSearchVisitorFactory implements SearchVisitorFactory {
 
-    private final
+    private final CassandraService cassandraService;
     private final IndexBucketLocator indexBucketLocator;
     private final QueryProcessor queryProcessor;
-    private final ConnectionRef connectionRef;
+    private final UUID applicationId;
+    private final EntityRef headEntity;
+    private final ConnectionRefImpl connectionRef;
     private final boolean outgoing;
+    private final String[] prefix;
 
-    private ConnectionSearchVisitorFactory( final IndexBucketLocator indexBucketLocator,
-                                            final QueryProcessor queryProcessor, final ConnectionRef connectionRef,
-                                            final boolean outgoing ){
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( indexBucketLocator, qp, connectionRef, true  );
 
+    private ConnectionSearchVisitorFactory( final CassandraService cassandraService,
+                                            final IndexBucketLocator indexBucketLocator,
+                                            final QueryProcessor queryProcessor, final UUID applicationId,
+                                            final EntityRef headEntity, ConnectionRefImpl connectionRef,
+                                            boolean outgoing, final String... prefix ) {
+        this.applicationId = applicationId;
+        this.cassandraService = cassandraService;
         this.indexBucketLocator = indexBucketLocator;
         this.queryProcessor = queryProcessor;
+        this.headEntity = headEntity;
         this.connectionRef = connectionRef;
         this.outgoing = outgoing;
+        this.prefix = prefix;
     }
 
 
     @Override
     public Collection<SearchVisitor> createVisitors() {
 
-        indexBucketLocator.getBuckets(  )
+        final List<String> buckets =
+                indexBucketLocator.getBuckets( applicationId, IndexBucketLocator.IndexType.CONNECTION, prefix );
 
 
-        return null;
+        final List<SearchVisitor> visitors = new ArrayList<SearchVisitor>( buckets.size() );
+
+        for ( final String bucket : buckets ) {
+
+            final SearchVisitor searchVisitor =
+                    new SearchConnectionVisitor( cassandraService, indexBucketLocator, queryProcessor, applicationId,
+                            headEntity, connectionRef, outgoing, bucket );
+            visitors.add( searchVisitor );
+        }
+
+
+        return visitors;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
new file mode 100644
index 0000000..61315e4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
@@ -0,0 +1,168 @@
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
+import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+
+
+/**
+ * Simple search visitor that performs all the joining
+ *
+ * @author tnine
+ */
+public class SearchCollectionVisitor extends SearchVisitor {
+
+
+    private static final UUIDIndexSliceParser UUID_PARSER = new UUIDIndexSliceParser();
+
+
+    private final CollectionInfo collection;
+
+
+    /**
+     * @param queryProcessor
+     */
+    public SearchCollectionVisitor( final CassandraService cassandraService,
+                                    final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
+                                    final UUID applicationId, final EntityRef headEntity, final String bucket ) {
+        super( cassandraService, indexBucketLocator, applicationId, headEntity, queryProcessor, bucket );
+        this.collection = queryProcessor.getCollectionInfo();
+    }
+
+
+    /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence
+ * .query.ir
+ * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
+ */
+    @Override
+    protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
+        // NOTE we explicitly do not append the slice value here. This
+        // is done in the searchIndex method below
+        Object indexKey = key( headEntity.getUuid(), collection.getName() );
+
+        // update the cursor and order before we perform the slice
+        // operation. Should be done after subkeying since this can
+        // change the hash value of the slice
+        queryProcessor.applyCursorAndSort( slice );
+
+        IndexScanner columns = null;
+
+        // nothing left to search for this range
+        if ( slice.isComplete() ) {
+            columns = new NoOpIndexScanner();
+        }
+        // perform the search
+        else {
+            columns =
+                    searchIndexBuckets( indexKey, slice, collection.getName(), queryProcessor.getPageSizeHint( node ) );
+        }
+
+        return columns;
+    }
+
+
+    public void visit( AllNode node ) throws Exception {
+
+        String collectionName = collection.getName();
+
+        QuerySlice slice = node.getSlice();
+
+        queryProcessor.applyCursorAndSort( slice );
+
+        UUID startId = null;
+
+        if ( slice.hasCursor() ) {
+            startId = UUID_PARSER.parse( slice.getCursor() ).getUUID();
+        }
+
+
+        IndexScanner indexScanner = cassandraService
+                .getIdList( cassandraService.getApplicationKeyspace( applicationId ),
+                        key( headEntity.getUuid(), DICTIONARY_COLLECTIONS, collectionName ), startId, null,
+                        queryProcessor.getPageSizeHint( node ), query.isReversed(), indexBucketLocator, applicationId,
+                        collectionName, node.isForceKeepFirst() );
+
+        this.results.push( new SliceIterator( slice, indexScanner, UUID_PARSER ) );
+    }
+
+
+    /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.WithinNode)
+ */
+    @Override
+    public void visit( WithinNode node ) throws Exception {
+
+        QuerySlice slice = node.getSlice();
+
+        queryProcessor.applyCursorAndSort( slice );
+
+        GeoIterator itr = new GeoIterator(
+                new CollectionGeoSearch( em, indexBucketLocator, cassandraService, headEntity, collection.getName() ),
+                query.getLimit(), slice, node.getPropertyName(), new Point( node.getLattitude(), node.getLongitude() ),
+                node.getDistance() );
+
+        results.push( itr );
+    }
+
+
+    @Override
+    public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
+        EntityRef ref = em.getAlias( headEntity.getUuid(), collection.getType(), nameIdentifierNode.getName() );
+
+        if ( ref == null ) {
+            this.results.push( new EmptyIterator() );
+            return;
+        }
+
+        this.results.push( new StaticIdIterator( ref.getUuid() ) );
+    }
+
+
+    /**
+     * Search the collection index using all the buckets for the given collection
+     *
+     * @param indexKey The index key to read
+     * @param slice Slice set in the query
+     * @param collectionName The name of the collection to search
+     * @param pageSize The page size to load when iterating
+     */
+    private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
+            throws Exception {
+
+        DynamicComposite[] range = slice.getRange();
+
+        Object keyPrefix = key( indexKey, slice.getPropertyName() );
+
+        IndexScanner scanner =
+                new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId, keyPrefix, bucket, range[0],
+                        range[1], slice.isReversed(), pageSize, slice.hasCursor() );
+
+        return scanner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa31768c/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
new file mode 100644
index 0000000..da99cdf
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -0,0 +1,213 @@
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
+import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+
+
+/**
+ * Simple search visitor that performs all the joining
+ *
+ * @author tnine
+ */
+public class SearchConnectionVisitor extends SearchVisitor {
+
+    private final ConnectionRefImpl connection;
+
+    /** True if we should search from source->target edges.  False if we should search from target<-source edges */
+    private final boolean outgoing;
+
+
+
+    /**
+     * @param queryProcessor They query processor to use
+     * @param applicationId
+     * @param connection The connection refernce
+     * @param outgoing The direction to search.  True if we should search from source->target edges.  False if we
+     */
+    public SearchConnectionVisitor( final CassandraService cassandraService,
+                                    final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
+                                    final UUID applicationId, final EntityRef headEntity, ConnectionRefImpl connection,
+                                    boolean outgoing, final String bucket ) {
+        super( cassandraService, indexBucketLocator, applicationId, headEntity, queryProcessor, bucket );
+        this.connection = connection;
+        this.outgoing = outgoing;
+    }
+
+
+    /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence
+ * .query.ir
+ * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
+ */
+    @Override
+    protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
+
+        UUID
+                id = ConnectionRefImpl.getIndexId( ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE, headEntity,
+                connection.getConnectionType(), connection.getConnectedEntityType(), new ConnectedEntityRef[0] );
+
+        Object key = key( id, INDEX_CONNECTIONS );
+
+        // update the cursor and order before we perform the slice
+        // operation
+        queryProcessor.applyCursorAndSort( slice );
+
+        IndexScanner columns = null;
+
+        if ( slice.isComplete() ) {
+            columns = new NoOpIndexScanner();
+        }
+        else {
+            columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ), bucket );
+        }
+
+        return columns;
+    }
+
+
+    /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.WithinNode)
+ */
+    @Override
+    public void visit( WithinNode node ) throws Exception {
+
+        QuerySlice slice = node.getSlice();
+
+        queryProcessor.applyCursorAndSort( slice );
+
+        GeoIterator itr =
+                new GeoIterator( new ConnectionGeoSearch( em, indexBucketLocator, cassandraService, connection.getIndexId() ),
+                        query.getLimit(), slice, node.getPropertyName(),
+                        new Point( node.getLattitude(), node.getLongitude() ), node.getDistance() );
+
+        results.push( itr );
+    }
+
+
+    @Override
+    public void visit( AllNode node ) throws Exception {
+        QuerySlice slice = node.getSlice();
+
+        queryProcessor.applyCursorAndSort( slice );
+
+        int size = queryProcessor.getPageSizeHint( node );
+
+        ByteBuffer start = null;
+
+        if ( slice.hasCursor() ) {
+            start = slice.getCursor();
+        }
+
+
+        boolean skipFirst = node.isForceKeepFirst() ? false : slice.hasCursor();
+
+        UUID entityIdToUse;
+
+        //change our type depending on which direction we're loading
+        String dictionaryType;
+
+        //the target type
+        String targetType;
+
+        //this is on the "source" side of the edge
+        if ( outgoing ) {
+            entityIdToUse = connection.getConnectingEntityId();
+            dictionaryType = DICTIONARY_CONNECTED_ENTITIES;
+            targetType = connection.getConnectedEntityType();
+        }
+
+        //we're on the target side of the edge
+        else {
+            entityIdToUse = connection.getConnectedEntityId();
+            dictionaryType = DICTIONARY_CONNECTING_ENTITIES;
+            targetType = connection.getConnectingEntityType();
+        }
+
+        final String connectionType = connection.getConnectionType();
+
+
+        final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType );
+
+
+        final Iterator<String> connectionTypes;
+
+        //use the provided connection type
+        if ( connectionType != null ) {
+            connectionTypes = Collections.singleton( connectionType ).iterator();
+        }
+
+        //we need to iterate all connection types
+        else {
+            connectionTypes = new ConnectionTypesIterator( cassandraService, applicationId, entityIdToUse, outgoing, size );
+        }
+
+        IndexScanner connectionScanner =
+                new ConnectedIndexScanner( cassandraService, dictionaryType, applicationId, entityIdToUse, connectionTypes,
+                        start, slice.isReversed(), size, skipFirst );
+
+        this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
+    }
+
+
+    @Override
+    public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
+        //TODO T.N. USERGRID-1919 actually validate this is connected
+        EntityRef ref =
+                em.getAlias( applicationId, connection.getConnectedEntityType(), nameIdentifierNode.getName() );
+
+        if ( ref == null ) {
+            this.results.push( new EmptyIterator() );
+            return;
+        }
+
+        this.results.push( new StaticIdIterator( ref.getUuid() ) );
+    }
+
+    private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize, final String shardBucket ) throws Exception {
+
+          DynamicComposite[] range = slice.getRange();
+
+          Object keyPrefix = key( indexKey, slice.getPropertyName() );
+
+
+
+          IndexScanner scanner =
+                  new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId,
+                          keyPrefix, shardBucket, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor());
+
+          return scanner;
+      }
+}