You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/06/24 21:20:25 UTC

[1/3] incubator-usergrid git commit: WIP overwrite

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-752 18850b1db -> c55d261ec


WIP overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/02b10292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/02b10292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/02b10292

Branch: refs/heads/USERGRID-752
Commit: 02b10292554f9900037cb063a055f5dc820b495e
Parents: 18850b1
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Jun 19 10:41:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Jun 19 10:41:46 2015 -0600

----------------------------------------------------------------------
 .../persistence/cassandra/CassandraService.java |  4 +-
 .../index/DynamicCompositeStartToBytes.java     | 48 +++++++++++++++
 .../cassandra/index/IndexBucketScanner.java     | 62 ++++++++++++++------
 .../cassandra/index/StartToBytes.java           | 34 +++++++++++
 .../cassandra/index/UUIDStartToBytes.java       | 41 +++++++++++++
 .../ir/result/SearchCollectionVisitor.java      |  3 +-
 .../ir/result/SearchConnectionVisitor.java      |  3 +-
 .../query/ir/result/SliceIterator.java          | 16 +++++
 8 files changed, 191 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 10c7151..12ea338 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.IndexBucketLocator;
 import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.UUIDStartToBytes;
 import org.apache.usergrid.persistence.hector.CountingMutator;
 
 import me.prettyprint.cassandra.connection.HConnectionManager;
@@ -1044,7 +1045,7 @@ public class CassandraService {
 
 
            IndexScanner scanner =
-                   new IndexBucketScanner( this, ENTITY_ID_SETS, applicationId, key, bucket, start,
+                   new IndexBucketScanner<UUID>( this, ENTITY_ID_SETS, UUIDStartToBytes.INSTANCE, applicationId, key, bucket, start,
                            finish, reversed, count, skipFirst );
 
            return scanner;
@@ -1062,4 +1063,5 @@ public class CassandraService {
     	}
     	cluster = null;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java
new file mode 100644
index 0000000..072e8ca
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java
@@ -0,0 +1,48 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+
+
+/**
+ * Converts a DynamicComposite to bytes
+ */
+public class DynamicCompositeStartToBytes implements StartToBytes<DynamicComposite> {
+
+    public static final DynamicCompositeStartToBytes INSTANCE = new DynamicCompositeStartToBytes();
+
+    private DynamicCompositeStartToBytes(){}
+
+
+    @Override
+    public ByteBuffer toBytes( final DynamicComposite toBytes ) {
+
+        if(toBytes == null){
+            return null;
+        }
+
+        return toBytes.serialize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 389f140..620d9b4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -18,16 +18,12 @@ package org.apache.usergrid.persistence.cassandra.index;
 
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.usergrid.persistence.cassandra.ApplicationCF;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 
@@ -44,7 +40,7 @@ import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtil
  *
  * @author tnine
  */
-public class IndexBucketScanner implements IndexScanner {
+public class IndexBucketScanner<T> implements IndexScanner {
 
     private final CassandraService cass;
     private final UUID applicationId;
@@ -55,12 +51,13 @@ public class IndexBucketScanner implements IndexScanner {
     private final int pageSize;
     private final boolean skipFirst;
     private final String bucket;
+    private final StartToBytes<T> scanStartSerializer;
 
     /** Pointer to our next start read */
-    private Object start;
+    private ByteBuffer start;
 
     /** Set to the original value to start scanning from */
-    private Object scanStart;
+    private T initialStartValue;
 
     /** Iterator for our results from the last page load */
     private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
@@ -70,14 +67,14 @@ public class IndexBucketScanner implements IndexScanner {
 
 
 
-    public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily,
-                               UUID applicationId, Object keyPrefix, String bucket,  Object start, Object finish,
+    public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily, StartToBytes<T> scanStartSerializer,
+                               UUID applicationId, Object keyPrefix, String bucket,  T start, T finish,
                                boolean reversed, int pageSize, boolean skipFirst) {
         this.cass = cass;
+        this.scanStartSerializer = scanStartSerializer;
         this.applicationId = applicationId;
         this.keyPrefix = keyPrefix;
         this.columnFamily = columnFamily;
-        this.start = start;
         this.finish = finish;
         this.reversed = reversed;
         this.skipFirst = skipFirst;
@@ -85,7 +82,10 @@ public class IndexBucketScanner implements IndexScanner {
 
         //we always add 1 to the page size.  This is because we pop the last column for the next page of results
         this.pageSize = pageSize+1;
-        this.scanStart = start;
+
+        //the initial value set when we started scanning
+        this.initialStartValue = start;
+        this.start = scanStartSerializer.toBytes( initialStartValue );
     }
 
 
@@ -95,7 +95,7 @@ public class IndexBucketScanner implements IndexScanner {
     @Override
     public void reset() {
         hasMore = true;
-        start = scanStart;
+        start = scanStartSerializer.toBytes( initialStartValue );
     }
 
 
@@ -122,7 +122,7 @@ public class IndexBucketScanner implements IndexScanner {
         //we purposefully use instance equality.  If it's a pointer to the same value, we need to increase by 1
         //since we'll be skipping the first value
 
-        final boolean firstPageSkipFirst = this.skipFirst &&  start == scanStart;
+        final boolean firstPageSkipFirst = this.skipFirst &&  start == initialStartValue;
 
         if(firstPageSkipFirst){
             selectSize++;
@@ -149,8 +149,14 @@ public class IndexBucketScanner implements IndexScanner {
         }
 
         //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
-        if ( firstPageSkipFirst && resultsTree.size() > 0 ) {
-            resultsTree.remove( 0 );
+        if ( firstPageSkipFirst && start != null && resultsTree.size() > 0 ) {
+
+            final ByteBuffer resultsName = resultsTree.get( 0 ).getName();
+
+            if ( compareBuffer( start, resultsName ) ) {
+
+                resultsTree.remove( 0 );
+            }
         }
 
         lastResults = resultsTree;
@@ -159,6 +165,27 @@ public class IndexBucketScanner implements IndexScanner {
     }
 
 
+    /**
+     * Returns true if the 2 byte buffers contain the same bytes, false otherwise
+     * @param first
+     * @param second
+     * @return
+     */
+    private boolean compareBuffer(final ByteBuffer first, final ByteBuffer second){
+        int firstLength = first.remaining();
+        int firstPosition = first.position();
+
+        int secondLength = second.remaining();
+        int secondPosition = second.position();
+
+        final int compare = FBUtilities.compareUnsigned( first.array(), second.array(),  firstPosition, secondPosition, firstLength, secondLength);
+
+        return compare == 0;
+
+
+    }
+
+
     /*
      * (non-Javadoc)
      *
@@ -235,4 +262,5 @@ public class IndexBucketScanner implements IndexScanner {
     public boolean isReversed() {
         return this.reversed;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java
new file mode 100644
index 0000000..c6516b8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Parse our index values into byte buffer
+ */
+public interface StartToBytes<T> {
+
+    /**
+     * Convert the start scanning type to bytes
+     * @param toBytes
+     * @return
+     */
+    ByteBuffer toBytes(final T toBytes);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java
new file mode 100644
index 0000000..2cdf9fd
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+
+
+/**
+ * Converts a UUID to bytes
+ */
+public class UUIDStartToBytes implements StartToBytes<UUID> {
+
+    public static final UUIDStartToBytes INSTANCE = new UUIDStartToBytes();
+
+    private UUIDStartToBytes(){}
+
+    private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
+
+    @Override
+    public ByteBuffer toBytes( final UUID toBytes ) {
+        return UUID_SERIALIZER.toByteBuffer( toBytes );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
index 895b9f0..c20baf4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
@@ -7,6 +7,7 @@ import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.IndexBucketLocator;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeStartToBytes;
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
@@ -159,7 +160,7 @@ public class SearchCollectionVisitor extends SearchVisitor {
         Object keyPrefix = key( indexKey, slice.getPropertyName() );
 
         IndexScanner scanner =
-                new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId, keyPrefix, bucket, range[0],
+                new IndexBucketScanner( cassandraService, ENTITY_INDEX, DynamicCompositeStartToBytes.INSTANCE, applicationId, keyPrefix, bucket, range[0],
                         range[1], slice.isReversed(), pageSize, slice.hasCursor() );
 
         return scanner;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index 74ff19f..6f833db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -13,6 +13,7 @@ import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
 import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeStartToBytes;
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
@@ -205,7 +206,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
 
 
           IndexScanner scanner =
-                  new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId,
+                  new IndexBucketScanner( cassandraService, ENTITY_INDEX,  DynamicCompositeStartToBytes.INSTANCE, applicationId,
                           keyPrefix, shardBucket, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor());
 
           return scanner;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index 3f052da..995d6c7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -125,12 +125,28 @@ public class SliceIterator implements ResultIterator {
 
         parsedCols.clear();
 
+        boolean compared = false;
+
         while ( results.hasNext() ) {
 
             ByteBuffer colName = results.next().getName().duplicate();
 
             ScanColumn parsed = parser.parse( colName, isReversed );
 
+            //we might want to skip this slice, we're not sure, so we have to check them
+           if(!compared){
+               compared = true;
+
+               if(slice.hasCursor()) {
+                   ScanColumn column = parser.parse( slice.getCursor(), isReversed );
+
+                   //this is our resume column, drop it
+                   if(column.getUUID().equals( parsed.getUUID() )){
+                        continue;
+                   }
+               }
+           }
+
             //skip this value, the parser has discarded it
             if ( parsed == null ) {
                 continue;


[2/3] incubator-usergrid git commit: WIP overwrite

Posted by to...@apache.org.
WIP overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9489231e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9489231e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9489231e

Branch: refs/heads/USERGRID-752
Commit: 9489231e4f2b27e8f0330754d0053f19c2f979ce
Parents: 02b1029
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Jun 22 18:06:14 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Jun 22 18:06:14 2015 -0600

----------------------------------------------------------------------
 .../cassandra/index/IndexBucketScanner.java     | 27 ++++++++++-----
 .../persistence/query/ir/SearchVisitor.java     |  2 +-
 .../query/ir/result/AbstractScanColumn.java     | 26 ++------------
 .../query/ir/result/SliceIterator.java          | 33 +++++-------------
 .../persistence/query/ir/result/UUIDColumn.java | 36 +++++---------------
 .../query/ir/result/UUIDIndexSliceParser.java   |  2 +-
 .../query/ir/result/UnionIterator.java          | 18 +++++-----
 .../usergrid/persistence/CollectionIT.java      |  2 +-
 8 files changed, 51 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 620d9b4..bf61846 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.usergrid.persistence.cassandra.ApplicationCF;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 
+import com.google.common.primitives.UnsignedBytes;
 import com.yammer.metrics.annotation.Metered;
 
 import me.prettyprint.hector.api.beans.HColumn;
@@ -122,12 +123,10 @@ public class IndexBucketScanner<T> implements IndexScanner {
         //we purposefully use instance equality.  If it's a pointer to the same value, we need to increase by 1
         //since we'll be skipping the first value
 
-        final boolean firstPageSkipFirst = this.skipFirst &&  start == initialStartValue;
-
-        if(firstPageSkipFirst){
-            selectSize++;
-        }
 
+        final boolean shouldCheckFirst =
+                //we should skip the value it's a cursor resume OR it's a previous page from a stateful iterator
+                (this.skipFirst &&  initialStartValue != null) || start != null;
 
         final List<HColumn<ByteBuffer, ByteBuffer>>
                 resultsTree = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
@@ -140,7 +139,6 @@ public class IndexBucketScanner<T> implements IndexScanner {
         if ( resultsTree.size() == selectSize ) {
             hasMore = true;
 
-
             // set the bytebuffer for the next pass
             start = resultsTree.get( resultsTree.size() - 1 ).getName();
         }
@@ -149,12 +147,23 @@ public class IndexBucketScanner<T> implements IndexScanner {
         }
 
         //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
-        if ( firstPageSkipFirst && start != null && resultsTree.size() > 0 ) {
+        //we only want to skip if our byte value are the same as our expected start.  Since we aren't stateful you can't
+        //be sure your start even comes back, and you don't want to erroneously remove columns
+        if ( shouldCheckFirst && resultsTree.size() > 0  && start != null) {
+            final int startIndex = start.position();
+            final int startLength = start.remaining();
+
+
+
+            final ByteBuffer returnedBuffer = resultsTree.get( 0 ).getName();
+            final int returnedIndex = returnedBuffer.position();
+            final int returnedLength = returnedBuffer.remaining();
 
-            final ByteBuffer resultsName = resultsTree.get( 0 ).getName();
 
-            if ( compareBuffer( start, resultsName ) ) {
+            final int compare = FBUtilities.compareUnsigned( start.array(), returnedBuffer.array(),  startIndex, returnedIndex, startLength, returnedLength ) ;
 
+            //the byte buffers are the same as our seek (which may or may not be the case in the first seek)
+            if(compare == 0){
                 resultsTree.remove( 0 );
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index 271fc67..fc2df4d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -160,7 +160,7 @@ public abstract class SearchVisitor implements NodeVisitor {
 
         final int nodeId = node.getId();
 
-        UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId, queryProcessor.getCursorCache( nodeId ) );
+        UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId );
 
         if ( left != null ) {
             union.addIterator( left );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
index d5d130e..2a359be 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
@@ -58,13 +58,13 @@ public abstract class AbstractScanColumn implements ScanColumn {
         if ( this == o ) {
             return true;
         }
-        if ( !( o instanceof AbstractScanColumn ) ) {
+        if ( !( o instanceof ScanColumn ) ) {
             return false;
         }
 
-        AbstractScanColumn that = ( AbstractScanColumn ) o;
+        ScanColumn that = ( ScanColumn ) o;
 
-        return uuid.equals( that.uuid );
+        return uuid.equals( that.getUUID() );
     }
 
 
@@ -94,24 +94,4 @@ public abstract class AbstractScanColumn implements ScanColumn {
         return child;
     }
 
-
-    /**
-     * Comparator for comparing children.  A null safe call
-     * @param otherScanColumn
-     * @return
-     */
-    protected int compareChildren( final ScanColumn otherScanColumn ) {
-
-        if ( otherScanColumn == null ) {
-            return 1;
-        }
-
-        final ScanColumn otherChild = otherScanColumn.getChild();
-
-        if ( child != null && otherChild != null ) {
-            return child.compareTo( otherChild );
-        }
-
-        return 0;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index 995d6c7..a386159 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.cassandra.CursorCache;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.exceptions.QueryIterationException;
@@ -125,27 +126,11 @@ public class SliceIterator implements ResultIterator {
 
         parsedCols.clear();
 
-        boolean compared = false;
-
         while ( results.hasNext() ) {
 
-            ByteBuffer colName = results.next().getName().duplicate();
-
-            ScanColumn parsed = parser.parse( colName, isReversed );
-
-            //we might want to skip this slice, we're not sure, so we have to check them
-           if(!compared){
-               compared = true;
-
-               if(slice.hasCursor()) {
-                   ScanColumn column = parser.parse( slice.getCursor(), isReversed );
+            final ByteBuffer colName = results.next().getName().duplicate();
 
-                   //this is our resume column, drop it
-                   if(column.getUUID().equals( parsed.getUUID() )){
-                        continue;
-                   }
-               }
-           }
+            final ScanColumn parsed = parser.parse( colName, isReversed );
 
             //skip this value, the parser has discarded it
             if ( parsed == null ) {
@@ -231,22 +216,22 @@ public class SliceIterator implements ResultIterator {
             //this is a bug
             if ( scanner.hasNext() ) {
                 logger.error(
-                        "An iterator attempted to access a slice that was not iterated over.  This will result in the" +
-                                " cursor construction failing" );
+                        "An iterator attempted to access a slice that was not iterated over.  This will result in the"
+                                + " cursor construction failing" );
                 throw new QueryIterationException(
-                        "An iterator attempted to access a slice that was not iterated over.  This will result in the" +
-                                " cursor construction failing" );
+                        "An iterator attempted to access a slice that was not iterated over.  This will result in the"
+                                + " cursor construction failing" );
             }
 
             final ByteBuffer sliceCursor = slice.getCursor();
 
             //we've never loaded anything, just re-use the existing slice
-            if (last == null && sliceCursor != null ) {
+            if ( last == null && sliceCursor != null ) {
                 bytes = sliceCursor;
             }
 
             //use the last column we loaded.  This way our scan returns nothing next time since start == finish
-            else if(last != null) {
+            else if ( last != null ) {
                 bytes = last.getCursorValue();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
index a5e09c1..640b391 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
@@ -4,53 +4,35 @@ package org.apache.usergrid.persistence.query.ir.result;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.cassandra.Serializers;
 import org.apache.usergrid.utils.UUIDUtils;
 
 
 /**
  * Used as a comparator for columns
  */
-class UUIDColumn implements  ScanColumn{
+class UUIDColumn extends AbstractScanColumn{
 
-    private final UUID uuid;
     private final int compareReversed;
-    private ScanColumn child;
 
-
-    UUIDColumn( final UUID uuid, final int compareReversed ) {
-        this.uuid = uuid;
+    protected UUIDColumn( final UUID uuid, final ByteBuffer columnNameBuffer, final int compareReversed  ) {
+        super( uuid, columnNameBuffer );
         this.compareReversed = compareReversed;
     }
 
 
-    @Override
-    public UUID getUUID() {
-        return uuid;
-    }
-
-
-    @Override
-    public ByteBuffer getCursorValue() {
-        return null;
-    }
-
-
-    @Override
-    public void setChild( final ScanColumn childColumn ) {
-        this.child = childColumn;
+    public UUIDColumn( final UUID uuid, final int compareReversed ) {
+        super(uuid, Serializers.ue.toByteBuffer( uuid ));
+        this.compareReversed = compareReversed;
     }
 
 
-    @Override
-    public ScanColumn getChild() {
-        return child;
-    }
 
 
     @Override
     public int compareTo( final ScanColumn other ) {
-
         return  UUIDUtils.compare( uuid, other.getUUID() ) * compareReversed;
-
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
index 91644ce..2c5b1ba 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
@@ -33,6 +33,6 @@ public class UUIDIndexSliceParser implements SliceParser {
     @Override
     public ScanColumn parse( final ByteBuffer columnNameBytes, final boolean isReversed ) {
         final int compare = isReversed? -1: 1;
-        return new UUIDColumn( ue.fromByteBuffer( columnNameBytes.duplicate() ), compare );
+        return new UUIDColumn( ue.fromByteBuffer( columnNameBytes.duplicate() ), columnNameBytes,  compare );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
index 635ca97..aca9852 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -133,7 +133,7 @@ public class UnionIterator extends MultiIterator {
 
         private final List<ScanColumn> list;
 
-        private UUIDColumn min;
+//        private UUIDColumn min;
 
 
         public SortedColumnList( final int maxSize, final UUID minUuid ) {
@@ -141,9 +141,9 @@ public class UnionIterator extends MultiIterator {
             this.list = new ArrayList<ScanColumn>( maxSize );
             this.maxSize = maxSize;
 
-            if ( minUuid != null ) {
-                min = new UUIDColumn( minUuid, 1 ) ;
-            }
+//            if ( minUuid != null ) {
+//                min = new UUIDColumn( minUuid, 1 ) ;
+//            }
         }
 
 
@@ -152,9 +152,9 @@ public class UnionIterator extends MultiIterator {
          */
         public void add( ScanColumn col ) {
             //less than our min, don't add
-            if ( min != null && min.compareTo( col ) >= 0 ) {
-                return;
-            }
+//            if ( min != null && min.compareTo( col ) >= 0 ) {
+//                return;
+//            }
 
             int index = Collections.binarySearch( this.list, col );
 
@@ -215,7 +215,7 @@ public class UnionIterator extends MultiIterator {
             }
 
             final UUID oldMin = this.list.get( size - 1 ).getUUID();
-            min = new UUIDColumn( oldMin, 1 );
+//            min = new UUIDColumn( oldMin, 1 );
         }
 
 
@@ -228,7 +228,7 @@ public class UnionIterator extends MultiIterator {
 
         public void reset(){
             clear();
-            this.min = null;
+//            this.min = null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index ac58bd7..99fed21 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -1444,7 +1444,7 @@ public class CollectionIT extends AbstractCoreIT {
         Query query = Query.fromQL( s );
 
         Results r = em.searchCollection( em.getApplicationRef(), "loveobjects", query );
-        assertTrue( r.size() == 1 );
+        assertEquals(1,  r.size() );
 
         String username = ( String ) ( ( Map ) r.getEntities().get( 0 ).getProperty( "Recipient" ) ).get( "Username" );
         // selection results should be a list of lists


[3/3] incubator-usergrid git commit: Wraps connection all scanners with shard filters to ensure we only process entities relevant to our shards

Posted by to...@apache.org.
Wraps connection all scanners with shard filters to ensure we only process entities relevant to our shards


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c55d261e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c55d261e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c55d261e

Branch: refs/heads/USERGRID-752
Commit: c55d261ecbeb46a87abe101df9ac0f3ff1b8a186
Parents: 9489231
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jun 24 13:20:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jun 24 13:20:22 2015 -0600

----------------------------------------------------------------------
 .../cassandra/RelationManagerImpl.java          |  12 +-
 .../cassandra/index/ConnectedIndexScanner.java  |  11 +-
 .../persistence/query/ir/SearchVisitor.java     |   2 +-
 .../result/ConnectionSearchVisitorFactory.java  |   2 +-
 .../query/ir/result/GatherIterator.java         |   6 -
 .../ir/result/SearchConnectionVisitor.java      |  10 +-
 .../query/ir/result/SliceIterator.java          |  25 +++-
 .../query/ir/result/SliceShardIterator.java     | 118 +++++++++++++++++++
 .../query/ir/result/UnionIterator.java          |  94 ++++++++++++---
 9 files changed, 236 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 977dad0..ebe2aec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -1910,10 +1910,6 @@ public class RelationManagerImpl implements RelationManager {
 
         ConnectionRefImpl connectionRef =
                 new ConnectionRefImpl( sourceEntity, connectionType, new SimpleEntityRef( connectedEntityType, null ) );
-        //        EntityRef connectedEntity) {
-        //    ConnectionRefImpl connectionRef = new ConnectionRefImpl(new ConnectedEntityRefImpl(connectionType,
-        // connectedEntityType, null, true), sourceEntity );
-
 
         final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
 
@@ -1921,8 +1917,6 @@ public class RelationManagerImpl implements RelationManager {
 
         ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, true, "" );
 
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
-
         return qp.getResults( collectionSearchVisitorFactory );
     }
 
@@ -1967,8 +1961,6 @@ public class RelationManagerImpl implements RelationManager {
 
         ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, "" );
 
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, false );
-
         return qp.getResults( collectionSearchVisitorFactory );
 	}
 
@@ -2007,9 +1999,7 @@ public class RelationManagerImpl implements RelationManager {
 
         QueryProcessor qp = new QueryProcessor( query, null, em, factory );
 
-        ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, "" );
-
-//        SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
+        ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, true, "" );
 
         return qp.getResults( collectionSearchVisitorFactory );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index 6c6152a..4b1a049 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.cassandra.index;
 
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -134,14 +135,16 @@ public class ConnectedIndexScanner implements IndexScanner {
         //go through each connection type until we exhaust the result sets
         while ( currentConnectionType != null ) {
 
+            final int lastResultsSize = lastResults == null? 0: lastResults.size();
+
             //only load a delta size to get this next page
-            int selectSize = totalSelectSize - lastResults.size();
+            final int selectSize = totalSelectSize - lastResultsSize;
 
 
-            Object key = key( entityId, dictionaryType, currentConnectionType );
+            final Object key = key( entityId, dictionaryType, currentConnectionType );
 
 
-            List<HColumn<ByteBuffer, ByteBuffer>> results =
+            final List<HColumn<ByteBuffer, ByteBuffer>> results =
                     cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_COMPOSITE_DICTIONARIES, key,
                             start, null, selectSize, reversed );
 
@@ -180,7 +183,7 @@ public class ConnectedIndexScanner implements IndexScanner {
 
         if ( hasMore && lastResults !=null && lastResults.size() > 0 ) {
             // set the bytebuffer for the next pass
-            lastResults.remove( lastResults.size() - 1 );
+            start = lastResults.remove( lastResults.size() - 1 ).getName();
         }
 
         return lastResults != null && lastResults.size() > 0;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index fc2df4d..271fc67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -160,7 +160,7 @@ public abstract class SearchVisitor implements NodeVisitor {
 
         final int nodeId = node.getId();
 
-        UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId );
+        UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId, queryProcessor.getCursorCache( nodeId ) );
 
         if ( left != null ) {
             union.addIterator( left );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
index 54eeb01..6a340b9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
@@ -41,7 +41,7 @@ public class ConnectionSearchVisitorFactory implements SearchVisitorFactory {
     private final boolean outgoing;
     private final String[] prefix;
 
-
+                   //cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, ""
     public ConnectionSearchVisitorFactory( final CassandraService cassandraService,
                                             final IndexBucketLocator indexBucketLocator,
                                             final QueryProcessor queryProcessor, final UUID applicationId,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index 5bc91e0..eaae6b2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -171,12 +171,6 @@ public class GatherIterator implements ResultIterator {
             results.add(next);
             cursorMap.put( next.getUUID(), iterator );
 
-//            //results are too large, trim them
-//            if(results.size() > pageSize){
-//               final ScanColumn toRemove =  results.pollLast();
-//                cursorMap.remove( toRemove.getUUID() );
-//            }
-
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index 6f833db..ff08245 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -53,7 +53,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
      * @param queryProcessor They query processor to use
      * @param applicationId
      * @param connection The connection refernce
-     * @param outgoing The direction to search.  True if we should search from source->target edges.  False if we
+     * @param outgoing The direction to search.  True if we should search from source->target edges.  False if we are searching target<-source
      */
     public SearchConnectionVisitor( final CassandraService cassandraService,
                                     final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
@@ -120,6 +120,9 @@ public class SearchConnectionVisitor extends SearchVisitor {
 
     @Override
     public void visit( AllNode node ) throws Exception {
+
+        //todo, use a cache for this...
+
         QuerySlice slice = node.getSlice();
 
         queryProcessor.applyCursorAndSort( slice );
@@ -179,7 +182,10 @@ public class SearchConnectionVisitor extends SearchVisitor {
                 new ConnectedIndexScanner( cassandraService, dictionaryType, applicationId, entityIdToUse, connectionTypes,
                         start, slice.isReversed(), size, skipFirst );
 
-        this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
+        //we have to create our wrapper so validate the data we read is correct for our shard
+        final SliceShardIterator.ShardBucketValidator validator = new SliceShardIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.CONNECTION, "" );
+
+        this.results.push(  new SliceShardIterator( validator, slice, connectionScanner, connectionParser ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index a386159..4f5bff5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -46,10 +46,10 @@ public class SliceIterator implements ResultIterator {
 
     private final LinkedHashMap<UUID, ScanColumn> cols;
     private final QuerySlice slice;
-    private final SliceParser parser;
-    private final IndexScanner scanner;
+    protected final SliceParser parser;
+    protected final IndexScanner scanner;
     private final int pageSize;
-    private final boolean isReversed;
+    protected final boolean isReversed;
 
 
     /**
@@ -128,9 +128,9 @@ public class SliceIterator implements ResultIterator {
 
         while ( results.hasNext() ) {
 
-            final ByteBuffer colName = results.next().getName().duplicate();
+            final HColumn<ByteBuffer, ByteBuffer> column = results.next();
 
-            final ScanColumn parsed = parser.parse( colName, isReversed );
+            final ScanColumn parsed = parse(column);
 
             //skip this value, the parser has discarded it
             if ( parsed == null ) {
@@ -151,6 +151,21 @@ public class SliceIterator implements ResultIterator {
     }
 
 
+    /**
+     * Parses the column.  If the column should be discarded, null should be returned
+     * @param column
+     * @return
+     */
+    protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column){
+
+        final ByteBuffer colName = column.getName().duplicate();
+
+        final ScanColumn parsed = parser.parse( colName, isReversed );
+
+        return parsed;
+    }
+
+
     /*
      * (non-Javadoc)
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
new file mode 100644
index 0000000..279ba2c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+
+/**
+ * An iterator that will check if the parsed column is part of this shard.  This is required due to a legacy storage
+ * format
+ *
+ * Connections are not sharded by target entity, as a result, we get data partition mismatches when performing
+ * intersections and seeks.  This is meant to discard target entities that are not part of the current shard
+ *
+ * @author tnine
+ */
+public class SliceShardIterator extends SliceIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger( SliceShardIterator.class );
+
+    private final ShardBucketValidator shardBucketValidator;
+
+
+    /**
+     * @param slice The slice used in the scanner
+     * @param scanner The scanner to use to read the cols
+     * @param parser The parser for the scanner results
+     */
+    public SliceShardIterator( final ShardBucketValidator shardBucketValidator, final QuerySlice slice,
+                               final IndexScanner scanner, final SliceParser parser ) {
+        super( slice, scanner, parser );
+
+        this.shardBucketValidator = shardBucketValidator;
+    }
+
+
+    /**
+     * Parses the column.  If the column should be discarded, null should be returned
+     */
+    protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column ) {
+
+        final ByteBuffer colName = column.getName().duplicate();
+
+        final ScanColumn parsed = parser.parse( colName, isReversed );
+
+        if(parsed == null){
+            return null;
+        }
+
+        final UUID entityId = parsed.getUUID();
+
+
+        //not for our current processing shard, discard
+        if(!shardBucketValidator.isInShard( entityId )){
+            return null;
+        }
+
+        return parsed;
+    }
+
+
+    /**
+     * Class that performs validation on an entity to ensure it's in the shard we expecte
+     */
+    public static final class ShardBucketValidator {
+        private final IndexBucketLocator indexBucketLocator;
+        private final String expectedBucket;
+        private final UUID applicationId;
+        private final IndexBucketLocator.IndexType type;
+        private final String[] components;
+
+
+        public ShardBucketValidator( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
+                                     final UUID applicationId, final IndexBucketLocator.IndexType type,
+                                     final String... components ) {
+            this.indexBucketLocator = indexBucketLocator;
+            this.expectedBucket = expectedBucket;
+            this.applicationId = applicationId;
+            this.type = type;
+            this.components = components;
+        }
+
+
+        public boolean isInShard( final UUID entityId ) {
+            //not for our current processing shard, discard
+            final String shard = indexBucketLocator.getBucket( applicationId, type, entityId, components );
+
+            return expectedBucket.equals( shard );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
index aca9852..b6e44d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -28,6 +28,8 @@ import java.util.UUID;
 
 import org.apache.usergrid.persistence.cassandra.CursorCache;
 
+import com.fasterxml.uuid.UUIDComparator;
+
 import static org.apache.usergrid.persistence.cassandra.Serializers.*;
 
 /**
@@ -131,19 +133,19 @@ public class UnionIterator extends MultiIterator {
 
         private final int maxSize;
 
-        private final List<ScanColumn> list;
+        private final List<UnionScanColumn> list;
 
-//        private UUIDColumn min;
+        private UnionScanColumn min;
 
 
         public SortedColumnList( final int maxSize, final UUID minUuid ) {
             //we need to allocate the extra space if required
-            this.list = new ArrayList<ScanColumn>( maxSize );
+            this.list = new ArrayList<UnionScanColumn>( maxSize );
             this.maxSize = maxSize;
 
-//            if ( minUuid != null ) {
-//                min = new UUIDColumn( minUuid, 1 ) ;
-//            }
+            if ( minUuid != null ) {
+                min = new UnionScanColumn(new UUIDColumn( minUuid, 1 )) ;
+            }
         }
 
 
@@ -151,12 +153,15 @@ public class UnionIterator extends MultiIterator {
          * Add the column to this list
          */
         public void add( ScanColumn col ) {
+
+            final UnionScanColumn unionScanColumn = new UnionScanColumn(col);
+
             //less than our min, don't add
-//            if ( min != null && min.compareTo( col ) >= 0 ) {
-//                return;
-//            }
+            if ( min != null && min.compareTo( unionScanColumn ) >= 0 ) {
+                return;
+            }
 
-            int index = Collections.binarySearch( this.list, col );
+            int index = Collections.binarySearch( this.list, unionScanColumn );
 
             //already present
             if ( index > -1 ) {
@@ -170,7 +175,7 @@ public class UnionIterator extends MultiIterator {
                 return;
             }
 
-            this.list.add( index, col );
+            this.list.add( index, unionScanColumn );
 
             final int size = this.list.size();
 
@@ -214,8 +219,7 @@ public class UnionIterator extends MultiIterator {
                 return;
             }
 
-            final UUID oldMin = this.list.get( size - 1 ).getUUID();
-//            min = new UUIDColumn( oldMin, 1 );
+            min = this.list.get( size - 1 );
         }
 
 
@@ -228,7 +232,69 @@ public class UnionIterator extends MultiIterator {
 
         public void reset(){
             clear();
-//            this.min = null;
+            this.min = null;
+        }
+    }
+
+    private static class UnionScanColumn implements ScanColumn{
+
+        private final ScanColumn delegate;
+        private ScanColumn child;
+
+
+        private UnionScanColumn( final ScanColumn delegate ) {
+            super();
+            this.delegate = delegate;}
+
+
+        @Override
+        public int compareTo( final ScanColumn o ) {
+            return UUIDComparator.staticCompare( delegate.getUUID(), o.getUUID() );
+        }
+
+
+        @Override
+        public UUID getUUID() {
+            return delegate.getUUID();
+        }
+
+
+        @Override
+        public ByteBuffer getCursorValue() {
+            return ue.toByteBuffer( delegate.getUUID() );
+        }
+
+
+        @Override
+        public void setChild( final ScanColumn childColumn ) {
+           //intentionally a no-op, since child is on the delegate
+        }
+
+
+        @Override
+        public ScanColumn getChild() {
+            return delegate.getChild();
+        }
+
+
+        @Override
+        public boolean equals( final Object o ) {
+            if ( this == o ) {
+                return true;
+            }
+            if ( !( o instanceof UnionScanColumn ) ) {
+                return false;
+            }
+
+            final UnionScanColumn that = ( UnionScanColumn ) o;
+
+            return delegate.getUUID().equals( that.delegate.getUUID() );
+        }
+
+
+        @Override
+        public int hashCode() {
+            return delegate.getUUID().hashCode();
         }
     }
 }