You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/06/16 19:58:45 UTC

[4/4] incubator-usergrid git commit: First pass of refactor to allow concurrent tree execution.

First pass of refactor to allow concurrent tree execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b21332d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b21332d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b21332d

Branch: refs/heads/USERGRID-752
Commit: 9b21332d1db5160f280fd74a3a64d82eca05e5cd
Parents: 107a465
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jun 16 11:58:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jun 16 11:58:39 2015 -0600

----------------------------------------------------------------------
 stack/awscluster/aws.properties                 | 19 +++++++
 .../apache/usergrid/persistence/Results.java    | 10 ++--
 .../persistence/cassandra/CassandraService.java | 45 ++++++++-------
 .../persistence/cassandra/QueryProcessor.java   | 13 +++--
 .../cassandra/index/ConnectedIndexScanner.java  | 38 +++----------
 .../cassandra/index/IndexBucketScanner.java     | 40 ++++++--------
 .../cassandra/index/IndexScanner.java           |  4 +-
 .../persistence/geo/GeoIndexSearcher.java       |  2 +
 .../persistence/query/ir/SearchVisitor.java     |  5 +-
 .../result/CollectionSearchVisitorFactory.java  | 35 ++++++++++++
 .../result/ConnectionSearchVisitorFactory.java  | 56 +++++++++++++++++++
 .../query/ir/result/GatherIterator.java         | 58 ++++++++++++++++++++
 .../query/ir/result/SearchVisitorFactory.java   | 37 +++++++++++++
 13 files changed, 274 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/awscluster/aws.properties
----------------------------------------------------------------------
diff --git a/stack/awscluster/aws.properties b/stack/awscluster/aws.properties
new file mode 100644
index 0000000..182168a
--- /dev/null
+++ b/stack/awscluster/aws.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  The ASF licenses this file to You
+# under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.  For additional information regarding
+# copyright in this work, please see the NOTICE file in the top level
+# directory of this distribution.
+#
+accesskey=AKIAJBSDUBUSCVQKLNCQ
+secretkey=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

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 64e081e..d64489d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -33,7 +33,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
-import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.result.SearchVisitorFactory;
 import org.apache.usergrid.utils.MapUtils;
 import org.apache.usergrid.utils.StringUtils;
 
@@ -85,7 +85,7 @@ public class Results implements Iterable<Entity> {
     String dataName;
 
     private QueryProcessor queryProcessor;
-    private SearchVisitor searchVisitor;
+    private SearchVisitorFactory searchVisitorFactory;
 
 
     public Results() {
@@ -1273,8 +1273,8 @@ public class Results implements Iterable<Entity> {
     }
 
 
-    public void setSearchVisitor( SearchVisitor searchVisitor ) {
-        this.searchVisitor = searchVisitor;
+    public void setSearchVisitorFactory( SearchVisitorFactory searchVisitor ) {
+        this.searchVisitorFactory = searchVisitor;
     }
 
 
@@ -1288,6 +1288,6 @@ public class Results implements Iterable<Entity> {
         q.setCursor( getCursor() );
         queryProcessor.setQuery( q );
 
-        return queryProcessor.getResults( searchVisitor );
+        return queryProcessor.getResults( searchVisitorFactory );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 43bfd2d..58d5d0f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -1027,27 +1027,30 @@ public class CassandraService {
      *
      * @throws Exception the exception
      */
-    public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
-                                   IndexBucketLocator locator, UUID applicationId, String collectionName, boolean keepFirst )
-            throws Exception {
-
-        if ( count <= 0 ) {
-            count = DEFAULT_COUNT;
-        }
-
-        if ( NULL_ID.equals( start ) ) {
-            start = null;
-        }
-
-
-        final boolean skipFirst = start != null && !keepFirst;
-
-        IndexScanner scanner =
-                new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
-                        finish, reversed, count, skipFirst, collectionName );
-
-        return scanner;
-    }
+       public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
+                                      IndexBucketLocator locator, UUID applicationId, String collectionName, boolean keepFirst )
+               throws Exception {
+
+           //TODO, refactor this
+           throw new UnsupportedOperationException( "Implement me" );
+
+   //        if ( count <= 0 ) {
+   //            count = DEFAULT_COUNT;
+   //        }
+   //
+   //        if ( NULL_ID.equals( start ) ) {
+   //            start = null;
+   //        }
+   //
+   //
+   //        final boolean skipFirst = start != null && !keepFirst;
+   //
+   //        IndexScanner scanner =
+   //                new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
+   //                        finish, reversed, count, skipFirst, collectionName );
+   //
+   //        return scanner;
+       }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
index c2b65b7..78392a4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@ -19,7 +19,6 @@ package org.apache.usergrid.persistence.cassandra;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Stack;
 import java.util.UUID;
@@ -47,14 +46,15 @@ import org.apache.usergrid.persistence.query.ir.OrNode;
 import org.apache.usergrid.persistence.query.ir.OrderByNode;
 import org.apache.usergrid.persistence.query.ir.QueryNode;
 import org.apache.usergrid.persistence.query.ir.QuerySlice;
-import org.apache.usergrid.persistence.query.ir.SearchVisitor;
 import org.apache.usergrid.persistence.query.ir.SliceNode;
 import org.apache.usergrid.persistence.query.ir.UuidIdentifierNode;
 import org.apache.usergrid.persistence.query.ir.WithinNode;
+import org.apache.usergrid.persistence.query.ir.result.GatherIterator;
 import org.apache.usergrid.persistence.query.ir.result.ResultIterator;
 import org.apache.usergrid.persistence.query.ir.result.ResultsLoader;
 import org.apache.usergrid.persistence.query.ir.result.ResultsLoaderFactory;
 import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
+import org.apache.usergrid.persistence.query.ir.result.SearchVisitorFactory;
 import org.apache.usergrid.persistence.query.tree.AndOperand;
 import org.apache.usergrid.persistence.query.tree.ContainsOperand;
 import org.apache.usergrid.persistence.query.tree.Equal;
@@ -138,6 +138,7 @@ public class QueryProcessor {
         if ( rootOperand != null ) {
             // visit the tree
 
+            //TODO, evaluate for each shard and generate a tree in this class?
             TreeEvaluator visitor = new TreeEvaluator();
 
             rootOperand.visit( visitor );
@@ -259,16 +260,16 @@ public class QueryProcessor {
     /**
      * Return the iterator results, ordered if required
      */
-    public Results getResults( SearchVisitor visitor ) throws Exception {
+    public Results getResults( final SearchVisitorFactory searchVisitorFactory ) throws Exception {
         // if we have no order by just load the results
 
         if ( rootNode == null ) {
             return null;
         }
 
-        rootNode.visit( visitor );
+        //use the gather iterator to collect all the
 
-        ResultIterator itr = visitor.getResults();
+        ResultIterator itr = new GatherIterator(rootNode, searchVisitorFactory.createVisitors()  );
 
         List<ScanColumn> entityIds = new ArrayList<ScanColumn>( Math.min( size, Query.MAX_LIMIT ) );
 
@@ -304,7 +305,7 @@ public class QueryProcessor {
 
         results.setQuery( query );
         results.setQueryProcessor( this );
-        results.setSearchVisitor( visitor );
+        results.setSearchVisitorFactory( searchVisitorFactory );
 
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index b412df8..27689ce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@ -63,7 +63,7 @@ public class ConnectedIndexScanner implements IndexScanner {
     /**
      * Iterator for our results from the last page load
      */
-    private LinkedHashSet<HColumn<ByteBuffer, ByteBuffer>> lastResults;
+    private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
 
     /**
      * True if our last load loaded a full page size.
@@ -131,16 +131,6 @@ public class ConnectedIndexScanner implements IndexScanner {
         }
 
 
-        lastResults = new LinkedHashSet<HColumn<ByteBuffer, ByteBuffer>>();
-
-
-        //cleanup columns for later logic
-        //pointer to the first col we load
-        HColumn<ByteBuffer, ByteBuffer> first = null;
-
-        //pointer to the last column we load
-        HColumn<ByteBuffer, ByteBuffer> last = null;
-
         //go through each connection type until we exhaust the result sets
         while ( currentConnectionType != null ) {
 
@@ -157,16 +147,7 @@ public class ConnectedIndexScanner implements IndexScanner {
 
             final int resultSize = results.size();
 
-            if(resultSize > 0){
-
-                last = results.get( resultSize -1 );
-
-                if(first == null ){
-                    first = results.get( 0 );
-                }
-            }
-
-            lastResults.addAll( results );
+            lastResults = results;
 
 
             // we loaded a full page, there might be more
@@ -193,14 +174,13 @@ public class ConnectedIndexScanner implements IndexScanner {
         }
 
         //remove the first element, we need to skip it
-        if ( skipFirst && first != null) {
-            lastResults.remove( first  );
+        if ( skipFirst && lastResults.size() > 0) {
+            lastResults.remove( 0  );
         }
 
-        if ( hasMore && last != null ) {
+        if ( hasMore && lastResults.size() > 0 ) {
             // set the bytebuffer for the next pass
-            start = last.getName();
-            lastResults.remove( last );
+            lastResults.remove( lastResults.size() - 1 );
         }
 
         return lastResults != null && lastResults.size() > 0;
@@ -213,7 +193,7 @@ public class ConnectedIndexScanner implements IndexScanner {
      * @see java.lang.Iterable#iterator()
      */
     @Override
-    public Iterator<Set<HColumn<ByteBuffer, ByteBuffer>>> iterator() {
+    public Iterator<List<HColumn<ByteBuffer, ByteBuffer>>> iterator() {
         return this;
     }
 
@@ -250,8 +230,8 @@ public class ConnectedIndexScanner implements IndexScanner {
      */
     @Override
     @Metered( group = "core", name = "IndexBucketScanner_load" )
-    public Set<HColumn<ByteBuffer, ByteBuffer>> next() {
-        Set<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
+    public List<HColumn<ByteBuffer, ByteBuffer>> next() {
+        List<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
 
         lastResults = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index b2ca591..8a9b709 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -47,16 +47,14 @@ import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtil
 public class IndexBucketScanner implements IndexScanner {
 
     private final CassandraService cass;
-    private final IndexBucketLocator indexBucketLocator;
     private final UUID applicationId;
     private final Object keyPrefix;
     private final ApplicationCF columnFamily;
     private final Object finish;
     private final boolean reversed;
     private final int pageSize;
-    private final String[] indexPath;
-    private final IndexType indexType;
     private final boolean skipFirst;
+    private final String bucket;
 
     /** Pointer to our next start read */
     private Object start;
@@ -65,18 +63,17 @@ public class IndexBucketScanner implements IndexScanner {
     private Object scanStart;
 
     /** Iterator for our results from the last page load */
-    private TreeSet<HColumn<ByteBuffer, ByteBuffer>> lastResults;
+    private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
 
     /** True if our last load loaded a full page size. */
     private boolean hasMore = true;
 
 
 
-    public IndexBucketScanner( CassandraService cass, IndexBucketLocator locator, ApplicationCF columnFamily,
-                               UUID applicationId, IndexType indexType, Object keyPrefix, Object start, Object finish,
-                               boolean reversed, int pageSize, boolean skipFirst, String... indexPath) {
+    public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily,
+                               UUID applicationId, Object keyPrefix, Object start, Object finish,
+                               boolean reversed, int pageSize, boolean skipFirst, String bucket) {
         this.cass = cass;
-        this.indexBucketLocator = locator;
         this.applicationId = applicationId;
         this.keyPrefix = keyPrefix;
         this.columnFamily = columnFamily;
@@ -84,11 +81,10 @@ public class IndexBucketScanner implements IndexScanner {
         this.finish = finish;
         this.reversed = reversed;
         this.skipFirst = skipFirst;
+        this.bucket = bucket;
 
         //we always add 1 to the page size.  This is because we pop the last column for the next page of results
         this.pageSize = pageSize+1;
-        this.indexPath = indexPath;
-        this.indexType = indexType;
         this.scanStart = start;
     }
 
@@ -117,13 +113,7 @@ public class IndexBucketScanner implements IndexScanner {
             return false;
         }
 
-        List<String> keys = indexBucketLocator.getBuckets( applicationId, indexType, indexPath );
-
-        List<Object> cassKeys = new ArrayList<Object>( keys.size() );
-
-        for ( String bucket : keys ) {
-            cassKeys.add( key( keyPrefix, bucket ) );
-        }
+       final Object rowKey =  key( keyPrefix, bucket );
 
         //if we skip the first we need to set the load to page size +2, since we'll discard the first
         //and start paging at the next entity, otherwise we'll just load the page size we need
@@ -138,8 +128,10 @@ public class IndexBucketScanner implements IndexScanner {
             selectSize++;
         }
 
-        TreeSet<HColumn<ByteBuffer, ByteBuffer>> resultsTree = IndexMultiBucketSetLoader
-                .load( cass, columnFamily, applicationId, cassKeys, start, finish, selectSize, reversed );
+
+        final List<HColumn<ByteBuffer, ByteBuffer>>
+                resultsTree = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
+                start, finish, selectSize, reversed );
 
         //remove the first element, it's from a cursor value and we don't want to retain it
 
@@ -150,7 +142,7 @@ public class IndexBucketScanner implements IndexScanner {
 
 
             // set the bytebuffer for the next pass
-            start = resultsTree.pollLast().getName();
+            start = resultsTree.get( resultsTree.size() - 1 ).getName();
         }
         else {
             hasMore = false;
@@ -158,7 +150,7 @@ public class IndexBucketScanner implements IndexScanner {
 
         //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
         if ( firstPageSkipFirst ) {
-            resultsTree.pollFirst();
+            resultsTree.remove( 0 );
         }
 
         lastResults = resultsTree;
@@ -173,7 +165,7 @@ public class IndexBucketScanner implements IndexScanner {
      * @see java.lang.Iterable#iterator()
      */
     @Override
-    public Iterator<Set<HColumn<ByteBuffer, ByteBuffer>>> iterator() {
+    public Iterator<List<HColumn<ByteBuffer, ByteBuffer>>> iterator() {
         return this;
     }
 
@@ -210,8 +202,8 @@ public class IndexBucketScanner implements IndexScanner {
      */
     @Override
     @Metered(group = "core", name = "IndexBucketScanner_load")
-    public NavigableSet<HColumn<ByteBuffer, ByteBuffer>> next() {
-        NavigableSet<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
+    public List<HColumn<ByteBuffer, ByteBuffer>> next() {
+        List<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
 
         lastResults = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexScanner.java
index a938ca3..1307361 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexScanner.java
@@ -19,7 +19,7 @@ package org.apache.usergrid.persistence.cassandra.index;
 
 import java.nio.ByteBuffer;
 import java.util.Iterator;
-import java.util.Set;
+import java.util.List;
 
 import me.prettyprint.hector.api.beans.HColumn;
 
@@ -30,7 +30,7 @@ import me.prettyprint.hector.api.beans.HColumn;
  * @author tnine
  */
 public interface IndexScanner
-        extends Iterable<Set<HColumn<ByteBuffer, ByteBuffer>>>, Iterator<Set<HColumn<ByteBuffer, ByteBuffer>>> {
+        extends Iterable<List<HColumn<ByteBuffer, ByteBuffer>>>, Iterator<List<HColumn<ByteBuffer, ByteBuffer>>> {
 
     /** Reset the scanner back to the start */
     public void reset();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/geo/GeoIndexSearcher.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/geo/GeoIndexSearcher.java b/stack/core/src/main/java/org/apache/usergrid/persistence/geo/GeoIndexSearcher.java
index 4bc160d..0bbace0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/geo/GeoIndexSearcher.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/geo/GeoIndexSearcher.java
@@ -328,6 +328,8 @@ public abstract class GeoIndexSearcher {
 
             // add buckets for each geoCell
 
+            //TODO, use merge logic here
+
             for ( String indexBucket : locator.getBuckets( appId, IndexType.GEO, geoCell ) ) {
                 keys.add( key( key, DICTIONARY_GEOCELL, geoCell, indexBucket ) );
             }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index e8c189a..0038689 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -56,14 +56,17 @@ public abstract class SearchVisitor implements NodeVisitor {
 
     protected final Stack<ResultIterator> results = new Stack<ResultIterator>();
 
+    protected final String bucket;
+
 
     /**
      * @param queryProcessor
      */
-    public SearchVisitor( QueryProcessor queryProcessor ) {
+    public SearchVisitor( QueryProcessor queryProcessor, final String bucket ) {
         this.query = queryProcessor.getQuery();
         this.queryProcessor = queryProcessor;
         this.em = queryProcessor.getEntityManager();
+        this.bucket = bucket;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
new file mode 100644
index 0000000..1fd1604
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/CollectionSearchVisitorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+
+
+public class CollectionSearchVisitorFactory implements SearchVisitorFactory {
+
+
+    @Override
+    public Collection<SearchVisitor> createVisitors() {
+//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, true );
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
new file mode 100644
index 0000000..371f79b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+
+
+public class ConnectionSearchVisitorFactory implements SearchVisitorFactory {
+
+    private final
+    private final IndexBucketLocator indexBucketLocator;
+    private final QueryProcessor queryProcessor;
+    private final ConnectionRef connectionRef;
+    private final boolean outgoing;
+
+    private ConnectionSearchVisitorFactory( final IndexBucketLocator indexBucketLocator,
+                                            final QueryProcessor queryProcessor, final ConnectionRef connectionRef,
+                                            final boolean outgoing ){
+//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( indexBucketLocator, qp, connectionRef, true  );
+
+        this.indexBucketLocator = indexBucketLocator;
+        this.queryProcessor = queryProcessor;
+        this.connectionRef = connectionRef;
+        this.outgoing = outgoing;
+    }
+
+
+    @Override
+    public Collection<SearchVisitor> createVisitors() {
+
+        indexBucketLocator.getBuckets(  )
+
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
new file mode 100644
index 0000000..48e2301
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -0,0 +1,58 @@
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.cassandra.CursorCache;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+
+
+/**
+ * Used to gather results from multiple sub iterators
+ */
+public class GatherIterator implements ResultIterator {
+
+
+    private final Collection<SearchVisitor> searchVisitors;
+    private final QueryNode rootNode;
+
+
+    public GatherIterator(  final QueryNode rootNode, final Collection<SearchVisitor> searchVisitors) {
+        this.rootNode = rootNode;
+        this.searchVisitors = searchVisitors;
+    }
+
+
+    @Override
+    public void reset() {
+        throw new UnsupportedOperationException( "Gather iterators cannot be reset" );
+    }
+
+
+    @Override
+    public void finalizeCursor( final CursorCache cache, final UUID lastValue ) {
+        //find the last value in the tree, and return it's cursor
+    }
+
+
+    @Override
+    public Iterator<Set<ScanColumn>> iterator() {
+        return this;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+
+    @Override
+    public Set<ScanColumn> next() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b21332d/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchVisitorFactory.java
new file mode 100644
index 0000000..26603b4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchVisitorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+
+
+/**
+ * A factory for generating search visitors for a shard
+ */
+public interface SearchVisitorFactory {
+
+    /**
+     * Creates a collection of search visitors.  Each visitor will evaluate the tree, and the results will be aggregated together
+     *
+     * @return 1 or more search visitors to visit our IR tree and return searches
+     */
+    Collection<SearchVisitor> createVisitors();
+}
+