You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/30 19:19:44 UTC
[06/25] incubator-usergrid git commit: Fixes resumption and off by 1
bugs
Fixes resumption and off by 1 bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c1157449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c1157449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c1157449
Branch: refs/heads/master
Commit: c1157449c9c2a32662423c0e14a1f73ef64042b4
Parents: aa79488
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Jun 25 10:16:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Jun 25 11:14:52 2015 -0600
----------------------------------------------------------------------
.../cassandra/index/CassandraColumnUtils.java | 72 ++++++++++++++++++
.../cassandra/index/ConnectedIndexScanner.java | 31 ++++----
.../cassandra/index/IndexBucketScanner.java | 77 ++++++--------------
.../ir/result/ConnectionIndexSliceParser.java | 10 ++-
4 files changed, 120 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java
new file mode 100644
index 0000000..8cc1ccd
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/CassandraColumnUtils.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+
+/**
+ * Utils for dealing with Pagination in cassandra
+ */
+public class CassandraColumnUtils {
+
+ /**
+ * Returns true if the 2 byte buffers contain the same bytes, false otherwise
+ */
+ public static boolean equals( final ByteBuffer first, final ByteBuffer second ) {
+ int firstLength = first.remaining();
+ int firstPosition = first.position();
+
+ int secondLength = second.remaining();
+ int secondPosition = second.position();
+
+ final int compare = FBUtilities
+ .compareUnsigned( first.array(), second.array(), firstPosition, secondPosition, firstLength,
+ secondLength );
+
+ return compare == 0;
+ }
+
+
+ /**
+ * Maybe remove the first if the byte buffers match
+ * @param columns
+ * @param startScan
+ */
+ public static void maybeRemoveFirst(final List<HColumn<ByteBuffer, ByteBuffer>> columns, final ByteBuffer startScan){
+ //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
+ //we only want to skip if our byte value are the same as our expected start. Since we aren't stateful you can't
+ //be sure your start even comes back, and you don't want to erroneously remove columns
+ if ( columns != null && columns.size() > 0 && startScan != null) {
+ final ByteBuffer returnedBuffer = columns.get( 0 ).getName();
+
+ //the byte buffers are the same as our seek (which may or may not be the case in the first seek)
+ if( CassandraColumnUtils.equals( startScan, returnedBuffer ) ){
+ columns.remove( 0 );
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index 4b1a049..aa3ddfb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@ -18,14 +18,12 @@ package org.apache.usergrid.persistence.cassandra.index;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Set;
import java.util.UUID;
import org.springframework.util.Assert;
+
import org.apache.usergrid.persistence.cassandra.CassandraService;
import com.yammer.metrics.annotation.Metered;
@@ -64,7 +62,7 @@ public class ConnectedIndexScanner implements IndexScanner {
/**
* Iterator for our results from the last page load
*/
- private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
+ private List<HColumn<ByteBuffer, ByteBuffer>> results;
/**
* True if our last load loaded a full page size.
@@ -122,12 +120,12 @@ public class ConnectedIndexScanner implements IndexScanner {
return false;
}
- boolean skipFirst = this.skipFirst && start == scanStart;
+ boolean shouldDropFirst = skipFirst && start == scanStart;
int totalSelectSize = pageSize + 1;
//we're discarding the first, so increase our total size by 1 since this value will be inclusive in the seek
- if ( skipFirst ) {
+ if ( shouldDropFirst ) {
totalSelectSize++;
}
@@ -135,7 +133,7 @@ public class ConnectedIndexScanner implements IndexScanner {
//go through each connection type until we exhaust the result sets
while ( currentConnectionType != null ) {
- final int lastResultsSize = lastResults == null? 0: lastResults.size();
+ final int lastResultsSize = results == null ? 0 : results.size();
//only load a delta size to get this next page
final int selectSize = totalSelectSize - lastResultsSize;
@@ -150,7 +148,7 @@ public class ConnectedIndexScanner implements IndexScanner {
final int resultSize = results.size();
- lastResults = results;
+ this.results = results;
// we loaded a full page, there might be more
@@ -177,16 +175,17 @@ public class ConnectedIndexScanner implements IndexScanner {
}
//remove the first element, we need to skip it
- if ( skipFirst && lastResults != null && lastResults.size() > 0) {
- lastResults.remove( 0 );
+ if ( shouldDropFirst ) {
+
+ results.remove( 0 );
}
- if ( hasMore && lastResults !=null && lastResults.size() > 0 ) {
+ if ( hasMore && results != null && results.size() > 0 ) {
// set the bytebuffer for the next pass
- start = lastResults.remove( lastResults.size() - 1 ).getName();
+ start = results.remove( results.size() - 1 ).getName();
}
- return lastResults != null && lastResults.size() > 0;
+ return results != null && results.size() > 0;
}
@@ -213,7 +212,7 @@ public class ConnectedIndexScanner implements IndexScanner {
// "next page" pointer
// Our currently buffered results don't exist or don't have a next. Try to
// load them again if they're less than the page size
- if ( lastResults == null && hasMore ) {
+ if ( results == null && hasMore ) {
try {
return load();
}
@@ -234,9 +233,9 @@ public class ConnectedIndexScanner implements IndexScanner {
@Override
@Metered( group = "core", name = "IndexBucketScanner_load" )
public List<HColumn<ByteBuffer, ByteBuffer>> next() {
- List<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
+ List<HColumn<ByteBuffer, ByteBuffer>> returnVal = results;
- lastResults = null;
+ results = null;
return returnVal;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index bf61846..0efa735 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -22,12 +22,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import org.apache.cassandra.utils.FBUtilities;
-
import org.apache.usergrid.persistence.cassandra.ApplicationCF;
import org.apache.usergrid.persistence.cassandra.CassandraService;
-import com.google.common.primitives.UnsignedBytes;
import com.yammer.metrics.annotation.Metered;
import me.prettyprint.hector.api.beans.HColumn;
@@ -47,18 +44,19 @@ public class IndexBucketScanner<T> implements IndexScanner {
private final UUID applicationId;
private final Object keyPrefix;
private final ApplicationCF columnFamily;
- private final Object finish;
+ private final ByteBuffer finish;
private final boolean reversed;
private final int pageSize;
private final boolean skipFirst;
private final String bucket;
private final StartToBytes<T> scanStartSerializer;
+ private final T initialStartValue;
/** Pointer to our next start read */
private ByteBuffer start;
- /** Set to the original value to start scanning from */
- private T initialStartValue;
+ private boolean resumedFromCursor = false;
+
/** Iterator for our results from the last page load */
private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
@@ -68,6 +66,7 @@ public class IndexBucketScanner<T> implements IndexScanner {
+
public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily, StartToBytes<T> scanStartSerializer,
UUID applicationId, Object keyPrefix, String bucket, T start, T finish,
boolean reversed, int pageSize, boolean skipFirst) {
@@ -76,17 +75,18 @@ public class IndexBucketScanner<T> implements IndexScanner {
this.applicationId = applicationId;
this.keyPrefix = keyPrefix;
this.columnFamily = columnFamily;
- this.finish = finish;
this.reversed = reversed;
this.skipFirst = skipFirst;
this.bucket = bucket;
- //we always add 1 to the page size. This is because we pop the last column for the next page of results
- this.pageSize = pageSize+1;
+ this.pageSize = pageSize;
//the initial value set when we started scanning
+
+ this.finish = scanStartSerializer.toBytes( finish );
this.initialStartValue = start;
- this.start = scanStartSerializer.toBytes( initialStartValue );
+
+ reset();
}
@@ -97,6 +97,7 @@ public class IndexBucketScanner<T> implements IndexScanner {
public void reset() {
hasMore = true;
start = scanStartSerializer.toBytes( initialStartValue );
+ resumedFromCursor = start != null && skipFirst;
}
@@ -118,81 +119,51 @@ public class IndexBucketScanner<T> implements IndexScanner {
//if we skip the first we need to set the load to page size +2, since we'll discard the first
//and start paging at the next entity, otherwise we'll just load the page size we need
- int selectSize = pageSize;
+ int selectSize = pageSize+1;
//we purposefully use instance equality. If it's a pointer to the same value, we need to increase by 1
//since we'll be skipping the first value
- final boolean shouldCheckFirst =
- //we should skip the value it's a cursor resume OR it's a previous page from a stateful iterator
- (this.skipFirst && initialStartValue != null) || start != null;
+ if(resumedFromCursor){
+ selectSize++;
+ }
final List<HColumn<ByteBuffer, ByteBuffer>>
- resultsTree = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
+ results = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
start, finish, selectSize, reversed );
//remove the first element, it's from a cursor value and we don't want to retain it
// we loaded a full page, there might be more
- if ( resultsTree.size() == selectSize ) {
+ if ( results.size() == selectSize ) {
hasMore = true;
// set the bytebuffer for the next pass
- start = resultsTree.get( resultsTree.size() - 1 ).getName();
+ start = results.remove( results.size() - 1 ).getName();
}
else {
hasMore = false;
}
+
+
//remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
//we only want to skip if our byte value are the same as our expected start. Since we aren't stateful you can't
//be sure your start even comes back, and you don't want to erroneously remove columns
- if ( shouldCheckFirst && resultsTree.size() > 0 && start != null) {
- final int startIndex = start.position();
- final int startLength = start.remaining();
-
-
-
- final ByteBuffer returnedBuffer = resultsTree.get( 0 ).getName();
- final int returnedIndex = returnedBuffer.position();
- final int returnedLength = returnedBuffer.remaining();
-
-
- final int compare = FBUtilities.compareUnsigned( start.array(), returnedBuffer.array(), startIndex, returnedIndex, startLength, returnedLength ) ;
-
- //the byte buffers are the same as our seek (which may or may not be the case in the first seek)
- if(compare == 0){
- resultsTree.remove( 0 );
- }
+ if ( resumedFromCursor ) {
+ CassandraColumnUtils.maybeRemoveFirst( results, scanStartSerializer.toBytes( initialStartValue ) );
+ resumedFromCursor = false;
}
- lastResults = resultsTree;
+ lastResults = results;
return lastResults != null && lastResults.size() > 0;
}
- /**
- * Returns true if the 2 byte buffers contain the same bytes, false otherwise
- * @param first
- * @param second
- * @return
- */
- private boolean compareBuffer(final ByteBuffer first, final ByteBuffer second){
- int firstLength = first.remaining();
- int firstPosition = first.position();
- int secondLength = second.remaining();
- int secondPosition = second.position();
-
- final int compare = FBUtilities.compareUnsigned( first.array(), second.array(), firstPosition, secondPosition, firstLength, secondLength);
-
- return compare == 0;
-
-
- }
/*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c1157449/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
index 2bb83b4..a669d8c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionIndexSliceParser.java
@@ -23,6 +23,8 @@ import java.util.UUID;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeComparator;
+import com.fasterxml.uuid.UUIDComparator;
+
import me.prettyprint.hector.api.beans.DynamicComposite;
@@ -94,7 +96,13 @@ public class ConnectionIndexSliceParser implements SliceParser {
return 1;
}
- return connectedType.compareTo( ((ConnectionColumn)o).connectedType );
+ final int compare = UUIDComparator.staticCompare( uuid, o.getUUID() );
+
+ if(compare == 0){
+ return connectedType.compareTo( ((ConnectionColumn)o).connectedType );
+ }
+
+ return compare;
}
}
}