You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 20:44:29 UTC

[06/35] incubator-usergrid git commit: Fixes resumption and off by 1 bugs

Fixes resumption and off by 1 bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c1157449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c1157449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c1157449

Branch: refs/heads/ug2-doc-update
Commit: c1157449c9c2a32662423c0e14a1f73ef64042b4
Parents: aa79488
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Jun 25 10:16:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Jun 25 11:14:52 2015 -0600

----------------------------------------------------------------------
 .../cassandra/index/CassandraColumnUtils.java   | 72 ++++++++++++++++++
 .../cassandra/index/ConnectedIndexScanner.java  | 31 ++++----
 .../cassandra/index/IndexBucketScanner.java     | 77 ++++++--------------
 .../ir/result/ConnectionIndexSliceParser.java   | 10 ++-
 4 files changed, 120 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java
new file mode 100644
index 0000000..8cc1ccd
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+
+/**
+ * Utils for dealing with Pagination in cassandra
+ */
+public class CassandraColumnUtils {
+
+    /**
+     * Returns true if the 2 byte buffers contain the same bytes, false otherwise
+     */
+    public static boolean equals( final ByteBuffer first, final ByteBuffer second ) {
+        int firstLength = first.remaining();
+        int firstPosition = first.position();
+
+        int secondLength = second.remaining();
+        int secondPosition = second.position();
+
+        final int compare = FBUtilities
+                .compareUnsigned( first.array(), second.array(), firstPosition, secondPosition, firstLength,
+                        secondLength );
+
+        return compare == 0;
+    }
+
+
+    /**
+     * Maybe remove the first if the byte buffers match
+     * @param columns
+     * @param startScan
+     */
+    public static void maybeRemoveFirst(final  List<HColumn<ByteBuffer, ByteBuffer>> columns, final ByteBuffer startScan){
+           //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
+        //we only want to skip if our byte value are the same as our expected start.  Since we aren't stateful you can't
+        //be sure your start even comes back, and you don't want to erroneously remove columns
+        if ( columns != null && columns.size() > 0  && startScan != null) {
+            final ByteBuffer returnedBuffer = columns.get( 0 ).getName();
+
+            //the byte buffers are the same as our seek (which may or may not be the case in the first seek)
+            if( CassandraColumnUtils.equals( startScan, returnedBuffer ) ){
+                columns.remove( 0 );
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index 4b1a049..aa3ddfb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@ -18,14 +18,12 @@ package org.apache.usergrid.persistence.cassandra.index;
 
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 
 import org.springframework.util.Assert;
+
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 
 import com.yammer.metrics.annotation.Metered;
@@ -64,7 +62,7 @@ public class ConnectedIndexScanner implements IndexScanner {
     /**
      * Iterator for our results from the last page load
      */
-    private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
+    private List<HColumn<ByteBuffer, ByteBuffer>> results;
 
     /**
      * True if our last load loaded a full page size.
@@ -122,12 +120,12 @@ public class ConnectedIndexScanner implements IndexScanner {
             return false;
         }
 
-        boolean skipFirst = this.skipFirst && start == scanStart;
+        boolean shouldDropFirst = skipFirst && start == scanStart;
 
         int totalSelectSize = pageSize + 1;
 
         //we're discarding the first, so increase our total size by 1 since this value will be inclusive in the seek
-        if ( skipFirst ) {
+        if ( shouldDropFirst ) {
             totalSelectSize++;
         }
 
@@ -135,7 +133,7 @@ public class ConnectedIndexScanner implements IndexScanner {
         //go through each connection type until we exhaust the result sets
         while ( currentConnectionType != null ) {
 
-            final int lastResultsSize = lastResults == null? 0: lastResults.size();
+            final int lastResultsSize = results == null ? 0 : results.size();
 
             //only load a delta size to get this next page
             final int selectSize = totalSelectSize - lastResultsSize;
@@ -150,7 +148,7 @@ public class ConnectedIndexScanner implements IndexScanner {
 
             final int resultSize = results.size();
 
-            lastResults = results;
+            this.results = results;
 
 
             // we loaded a full page, there might be more
@@ -177,16 +175,17 @@ public class ConnectedIndexScanner implements IndexScanner {
         }
 
         //remove the first element, we need to skip it
-        if ( skipFirst && lastResults != null && lastResults.size() > 0) {
-            lastResults.remove( 0  );
+        if ( shouldDropFirst  ) {
+
+          results.remove( 0 );
         }
 
-        if ( hasMore && lastResults !=null && lastResults.size() > 0 ) {
+        if ( hasMore && results != null && results.size() > 0 ) {
             // set the bytebuffer for the next pass
-            start = lastResults.remove( lastResults.size() - 1 ).getName();
+            start = results.remove( results.size() - 1 ).getName();
         }
 
-        return lastResults != null && lastResults.size() > 0;
+        return results != null && results.size() > 0;
     }
 
 
@@ -213,7 +212,7 @@ public class ConnectedIndexScanner implements IndexScanner {
         // "next page" pointer
         // Our currently buffered results don't exist or don't have a next. Try to
         // load them again if they're less than the page size
-        if ( lastResults == null && hasMore ) {
+        if ( results == null && hasMore ) {
             try {
                 return load();
             }
@@ -234,9 +233,9 @@ public class ConnectedIndexScanner implements IndexScanner {
     @Override
     @Metered( group = "core", name = "IndexBucketScanner_load" )
     public List<HColumn<ByteBuffer, ByteBuffer>> next() {
-        List<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
+        List<HColumn<ByteBuffer, ByteBuffer>> returnVal = results;
 
-        lastResults = null;
+        results = null;
 
         return returnVal;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index bf61846..0efa735 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -22,12 +22,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.cassandra.utils.FBUtilities;
-
 import org.apache.usergrid.persistence.cassandra.ApplicationCF;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 
-import com.google.common.primitives.UnsignedBytes;
 import com.yammer.metrics.annotation.Metered;
 
 import me.prettyprint.hector.api.beans.HColumn;
@@ -47,18 +44,19 @@ public class IndexBucketScanner<T> implements IndexScanner {
     private final UUID applicationId;
     private final Object keyPrefix;
     private final ApplicationCF columnFamily;
-    private final Object finish;
+    private final ByteBuffer finish;
     private final boolean reversed;
     private final int pageSize;
     private final boolean skipFirst;
     private final String bucket;
     private final StartToBytes<T> scanStartSerializer;
+    private final T initialStartValue;
 
     /** Pointer to our next start read */
     private ByteBuffer start;
 
-    /** Set to the original value to start scanning from */
-    private T initialStartValue;
+    private boolean resumedFromCursor = false;
+
 
     /** Iterator for our results from the last page load */
     private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
@@ -68,6 +66,7 @@ public class IndexBucketScanner<T> implements IndexScanner {
 
 
 
+
     public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily, StartToBytes<T> scanStartSerializer,
                                UUID applicationId, Object keyPrefix, String bucket,  T start, T finish,
                                boolean reversed, int pageSize, boolean skipFirst) {
@@ -76,17 +75,18 @@ public class IndexBucketScanner<T> implements IndexScanner {
         this.applicationId = applicationId;
         this.keyPrefix = keyPrefix;
         this.columnFamily = columnFamily;
-        this.finish = finish;
         this.reversed = reversed;
         this.skipFirst = skipFirst;
         this.bucket = bucket;
 
-        //we always add 1 to the page size.  This is because we pop the last column for the next page of results
-        this.pageSize = pageSize+1;
+        this.pageSize = pageSize;
 
         //the initial value set when we started scanning
+
+        this.finish = scanStartSerializer.toBytes( finish );
         this.initialStartValue = start;
-        this.start = scanStartSerializer.toBytes( initialStartValue );
+
+        reset();
     }
 
 
@@ -97,6 +97,7 @@ public class IndexBucketScanner<T> implements IndexScanner {
     public void reset() {
         hasMore = true;
         start = scanStartSerializer.toBytes( initialStartValue );
+        resumedFromCursor = start != null && skipFirst;
     }
 
 
@@ -118,81 +119,51 @@ public class IndexBucketScanner<T> implements IndexScanner {
 
         //if we skip the first we need to set the load to page size +2, since we'll discard the first
         //and start paging at the next entity, otherwise we'll just load the page size we need
-        int selectSize = pageSize;
+        int selectSize = pageSize+1;
 
         //we purposefully use instance equality.  If it's a pointer to the same value, we need to increase by 1
         //since we'll be skipping the first value
 
 
-        final boolean shouldCheckFirst =
-                //we should skip the value it's a cursor resume OR it's a previous page from a stateful iterator
-                (this.skipFirst &&  initialStartValue != null) || start != null;
+        if(resumedFromCursor){
+            selectSize++;
+        }
 
         final List<HColumn<ByteBuffer, ByteBuffer>>
-                resultsTree = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
+                results = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
                 start, finish, selectSize, reversed );
 
         //remove the first element, it's from a cursor value and we don't want to retain it
 
 
         // we loaded a full page, there might be more
-        if ( resultsTree.size() == selectSize ) {
+        if ( results.size() == selectSize ) {
             hasMore = true;
 
             // set the bytebuffer for the next pass
-            start = resultsTree.get( resultsTree.size() - 1 ).getName();
+            start = results.remove( results.size() - 1 ).getName();
         }
         else {
             hasMore = false;
         }
 
+
+
         //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
         //we only want to skip if our byte value are the same as our expected start.  Since we aren't stateful you can't
         //be sure your start even comes back, and you don't want to erroneously remove columns
-        if ( shouldCheckFirst && resultsTree.size() > 0  && start != null) {
-            final int startIndex = start.position();
-            final int startLength = start.remaining();
-
-
-
-            final ByteBuffer returnedBuffer = resultsTree.get( 0 ).getName();
-            final int returnedIndex = returnedBuffer.position();
-            final int returnedLength = returnedBuffer.remaining();
-
-
-            final int compare = FBUtilities.compareUnsigned( start.array(), returnedBuffer.array(),  startIndex, returnedIndex, startLength, returnedLength ) ;
-
-            //the byte buffers are the same as our seek (which may or may not be the case in the first seek)
-            if(compare == 0){
-                resultsTree.remove( 0 );
-            }
+        if ( resumedFromCursor ) {
+            CassandraColumnUtils.maybeRemoveFirst( results, scanStartSerializer.toBytes( initialStartValue ) );
+            resumedFromCursor = false;
         }
 
-        lastResults = resultsTree;
+        lastResults = results;
 
         return lastResults != null && lastResults.size() > 0;
     }
 
 
-    /**
-     * Returns true if the 2 byte buffers contain the same bytes, false otherwise
-     * @param first
-     * @param second
-     * @return
-     */
-    private boolean compareBuffer(final ByteBuffer first, final ByteBuffer second){
-        int firstLength = first.remaining();
-        int firstPosition = first.position();
 
-        int secondLength = second.remaining();
-        int secondPosition = second.position();
-
-        final int compare = FBUtilities.compareUnsigned( first.array(), second.array(),  firstPosition, secondPosition, firstLength, secondLength);
-
-        return compare == 0;
-
-
-    }
 
 
     /*

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
index 2bb83b4..a669d8c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
@@ -23,6 +23,8 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeComparator;
 
+import com.fasterxml.uuid.UUIDComparator;
+
 import me.prettyprint.hector.api.beans.DynamicComposite;
 
 
@@ -94,7 +96,13 @@ public class ConnectionIndexSliceParser implements SliceParser {
                 return 1;
             }
 
-            return connectedType.compareTo( ((ConnectionColumn)o).connectedType );
+            final int compare =  UUIDComparator.staticCompare( uuid, o.getUUID() );
+
+            if(compare == 0){
+                return connectedType.compareTo( ((ConnectionColumn)o).connectedType );
+            }
+
+            return compare;
         }
     }
 }