You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/06/24 21:20:25 UTC
[1/3] incubator-usergrid git commit: WIP overwrite
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-752 18850b1db -> c55d261ec
WIP overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/02b10292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/02b10292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/02b10292
Branch: refs/heads/USERGRID-752
Commit: 02b10292554f9900037cb063a055f5dc820b495e
Parents: 18850b1
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Jun 19 10:41:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Jun 19 10:41:46 2015 -0600
----------------------------------------------------------------------
.../persistence/cassandra/CassandraService.java | 4 +-
.../index/DynamicCompositeStartToBytes.java | 48 +++++++++++++++
.../cassandra/index/IndexBucketScanner.java | 62 ++++++++++++++------
.../cassandra/index/StartToBytes.java | 34 +++++++++++
.../cassandra/index/UUIDStartToBytes.java | 41 +++++++++++++
.../ir/result/SearchCollectionVisitor.java | 3 +-
.../ir/result/SearchConnectionVisitor.java | 3 +-
.../query/ir/result/SliceIterator.java | 16 +++++
8 files changed, 191 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 10c7151..12ea338 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.UUIDStartToBytes;
import org.apache.usergrid.persistence.hector.CountingMutator;
import me.prettyprint.cassandra.connection.HConnectionManager;
@@ -1044,7 +1045,7 @@ public class CassandraService {
IndexScanner scanner =
- new IndexBucketScanner( this, ENTITY_ID_SETS, applicationId, key, bucket, start,
+ new IndexBucketScanner<UUID>( this, ENTITY_ID_SETS, UUIDStartToBytes.INSTANCE, applicationId, key, bucket, start,
finish, reversed, count, skipFirst );
return scanner;
@@ -1062,4 +1063,5 @@ public class CassandraService {
}
cluster = null;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java
new file mode 100644
index 0000000..072e8ca
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/DynamicCompositeStartToBytes.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+
+
+/**
+ * Converts a DynamicComposite to bytes
+ */
+public class DynamicCompositeStartToBytes implements StartToBytes<DynamicComposite> {
+
+ public static final DynamicCompositeStartToBytes INSTANCE = new DynamicCompositeStartToBytes();
+
+ private DynamicCompositeStartToBytes(){}
+
+
+ @Override
+ public ByteBuffer toBytes( final DynamicComposite toBytes ) {
+
+ if(toBytes == null){
+ return null;
+ }
+
+ return toBytes.serialize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 389f140..620d9b4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -18,16 +18,12 @@ package org.apache.usergrid.persistence.cassandra.index;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.UUID;
-import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.usergrid.persistence.cassandra.ApplicationCF;
import org.apache.usergrid.persistence.cassandra.CassandraService;
@@ -44,7 +40,7 @@ import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtil
*
* @author tnine
*/
-public class IndexBucketScanner implements IndexScanner {
+public class IndexBucketScanner<T> implements IndexScanner {
private final CassandraService cass;
private final UUID applicationId;
@@ -55,12 +51,13 @@ public class IndexBucketScanner implements IndexScanner {
private final int pageSize;
private final boolean skipFirst;
private final String bucket;
+ private final StartToBytes<T> scanStartSerializer;
/** Pointer to our next start read */
- private Object start;
+ private ByteBuffer start;
/** Set to the original value to start scanning from */
- private Object scanStart;
+ private T initialStartValue;
/** Iterator for our results from the last page load */
private List<HColumn<ByteBuffer, ByteBuffer>> lastResults;
@@ -70,14 +67,14 @@ public class IndexBucketScanner implements IndexScanner {
- public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily,
- UUID applicationId, Object keyPrefix, String bucket, Object start, Object finish,
+ public IndexBucketScanner( CassandraService cass, ApplicationCF columnFamily, StartToBytes<T> scanStartSerializer,
+ UUID applicationId, Object keyPrefix, String bucket, T start, T finish,
boolean reversed, int pageSize, boolean skipFirst) {
this.cass = cass;
+ this.scanStartSerializer = scanStartSerializer;
this.applicationId = applicationId;
this.keyPrefix = keyPrefix;
this.columnFamily = columnFamily;
- this.start = start;
this.finish = finish;
this.reversed = reversed;
this.skipFirst = skipFirst;
@@ -85,7 +82,10 @@ public class IndexBucketScanner implements IndexScanner {
//we always add 1 to the page size. This is because we pop the last column for the next page of results
this.pageSize = pageSize+1;
- this.scanStart = start;
+
+ //the initial value set when we started scanning
+ this.initialStartValue = start;
+ this.start = scanStartSerializer.toBytes( initialStartValue );
}
@@ -95,7 +95,7 @@ public class IndexBucketScanner implements IndexScanner {
@Override
public void reset() {
hasMore = true;
- start = scanStart;
+ start = scanStartSerializer.toBytes( initialStartValue );
}
@@ -122,7 +122,7 @@ public class IndexBucketScanner implements IndexScanner {
//we purposefully use instance equality. If it's a pointer to the same value, we need to increase by 1
//since we'll be skipping the first value
- final boolean firstPageSkipFirst = this.skipFirst && start == scanStart;
+ final boolean firstPageSkipFirst = this.skipFirst && start == initialStartValue;
if(firstPageSkipFirst){
selectSize++;
@@ -149,8 +149,14 @@ public class IndexBucketScanner implements IndexScanner {
}
//remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
- if ( firstPageSkipFirst && resultsTree.size() > 0 ) {
- resultsTree.remove( 0 );
+ if ( firstPageSkipFirst && start != null && resultsTree.size() > 0 ) {
+
+ final ByteBuffer resultsName = resultsTree.get( 0 ).getName();
+
+ if ( compareBuffer( start, resultsName ) ) {
+
+ resultsTree.remove( 0 );
+ }
}
lastResults = resultsTree;
@@ -159,6 +165,27 @@ public class IndexBucketScanner implements IndexScanner {
}
+ /**
+ * Returns true if the 2 byte buffers contain the same bytes, false otherwise
+ * @param first
+ * @param second
+ * @return
+ */
+ private boolean compareBuffer(final ByteBuffer first, final ByteBuffer second){
+ int firstLength = first.remaining();
+ int firstPosition = first.position();
+
+ int secondLength = second.remaining();
+ int secondPosition = second.position();
+
+ final int compare = FBUtilities.compareUnsigned( first.array(), second.array(), firstPosition, secondPosition, firstLength, secondLength);
+
+ return compare == 0;
+
+
+ }
+
+
/*
* (non-Javadoc)
*
@@ -235,4 +262,5 @@ public class IndexBucketScanner implements IndexScanner {
public boolean isReversed() {
return this.reversed;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java
new file mode 100644
index 0000000..c6516b8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/StartToBytes.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Parse our index values into byte buffer
+ */
+public interface StartToBytes<T> {
+
+ /**
+ * Convert the start scanning type to bytes
+ * @param toBytes
+ * @return
+ */
+ ByteBuffer toBytes(final T toBytes);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java
new file mode 100644
index 0000000..2cdf9fd
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/UUIDStartToBytes.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+
+
+/**
+ * Converts a UUID to bytes
+ */
+public class UUIDStartToBytes implements StartToBytes<UUID> {
+
+ public static final UUIDStartToBytes INSTANCE = new UUIDStartToBytes();
+
+ private UUIDStartToBytes(){}
+
+ private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
+
+ @Override
+ public ByteBuffer toBytes( final UUID toBytes ) {
+ return UUID_SERIALIZER.toByteBuffer( toBytes );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
index 895b9f0..c20baf4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchCollectionVisitor.java
@@ -7,6 +7,7 @@ import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeStartToBytes;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
@@ -159,7 +160,7 @@ public class SearchCollectionVisitor extends SearchVisitor {
Object keyPrefix = key( indexKey, slice.getPropertyName() );
IndexScanner scanner =
- new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId, keyPrefix, bucket, range[0],
+ new IndexBucketScanner( cassandraService, ENTITY_INDEX, DynamicCompositeStartToBytes.INSTANCE, applicationId, keyPrefix, bucket, range[0],
range[1], slice.isReversed(), pageSize, slice.hasCursor() );
return scanner;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index 74ff19f..6f833db 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -13,6 +13,7 @@ import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.cassandra.QueryProcessor;
import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.DynamicCompositeStartToBytes;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
@@ -205,7 +206,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
IndexScanner scanner =
- new IndexBucketScanner( cassandraService, ENTITY_INDEX, applicationId,
+ new IndexBucketScanner( cassandraService, ENTITY_INDEX, DynamicCompositeStartToBytes.INSTANCE, applicationId,
keyPrefix, shardBucket, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor());
return scanner;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/02b10292/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index 3f052da..995d6c7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -125,12 +125,28 @@ public class SliceIterator implements ResultIterator {
parsedCols.clear();
+ boolean compared = false;
+
while ( results.hasNext() ) {
ByteBuffer colName = results.next().getName().duplicate();
ScanColumn parsed = parser.parse( colName, isReversed );
+ //we might want to skip this slice, we're not sure, so we have to check them
+ if(!compared){
+ compared = true;
+
+ if(slice.hasCursor()) {
+ ScanColumn column = parser.parse( slice.getCursor(), isReversed );
+
+ //this is our resume column, drop it
+ if(column.getUUID().equals( parsed.getUUID() )){
+ continue;
+ }
+ }
+ }
+
//skip this value, the parser has discarded it
if ( parsed == null ) {
continue;
[2/3] incubator-usergrid git commit: WIP overwrite
Posted by to...@apache.org.
WIP overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9489231e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9489231e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9489231e
Branch: refs/heads/USERGRID-752
Commit: 9489231e4f2b27e8f0330754d0053f19c2f979ce
Parents: 02b1029
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Jun 22 18:06:14 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Jun 22 18:06:14 2015 -0600
----------------------------------------------------------------------
.../cassandra/index/IndexBucketScanner.java | 27 ++++++++++-----
.../persistence/query/ir/SearchVisitor.java | 2 +-
.../query/ir/result/AbstractScanColumn.java | 26 ++------------
.../query/ir/result/SliceIterator.java | 33 +++++-------------
.../persistence/query/ir/result/UUIDColumn.java | 36 +++++---------------
.../query/ir/result/UUIDIndexSliceParser.java | 2 +-
.../query/ir/result/UnionIterator.java | 18 +++++-----
.../usergrid/persistence/CollectionIT.java | 2 +-
8 files changed, 51 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 620d9b4..bf61846 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.usergrid.persistence.cassandra.ApplicationCF;
import org.apache.usergrid.persistence.cassandra.CassandraService;
+import com.google.common.primitives.UnsignedBytes;
import com.yammer.metrics.annotation.Metered;
import me.prettyprint.hector.api.beans.HColumn;
@@ -122,12 +123,10 @@ public class IndexBucketScanner<T> implements IndexScanner {
//we purposefully use instance equality. If it's a pointer to the same value, we need to increase by 1
//since we'll be skipping the first value
- final boolean firstPageSkipFirst = this.skipFirst && start == initialStartValue;
-
- if(firstPageSkipFirst){
- selectSize++;
- }
+ final boolean shouldCheckFirst =
+ //we should skip the value it's a cursor resume OR it's a previous page from a stateful iterator
+ (this.skipFirst && initialStartValue != null) || start != null;
final List<HColumn<ByteBuffer, ByteBuffer>>
resultsTree = cass.getColumns( cass.getApplicationKeyspace( applicationId ), columnFamily, rowKey,
@@ -140,7 +139,6 @@ public class IndexBucketScanner<T> implements IndexScanner {
if ( resultsTree.size() == selectSize ) {
hasMore = true;
-
// set the bytebuffer for the next pass
start = resultsTree.get( resultsTree.size() - 1 ).getName();
}
@@ -149,12 +147,23 @@ public class IndexBucketScanner<T> implements IndexScanner {
}
//remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
- if ( firstPageSkipFirst && start != null && resultsTree.size() > 0 ) {
+ //we only want to skip if our byte value are the same as our expected start. Since we aren't stateful you can't
+ //be sure your start even comes back, and you don't want to erroneously remove columns
+ if ( shouldCheckFirst && resultsTree.size() > 0 && start != null) {
+ final int startIndex = start.position();
+ final int startLength = start.remaining();
+
+
+
+ final ByteBuffer returnedBuffer = resultsTree.get( 0 ).getName();
+ final int returnedIndex = returnedBuffer.position();
+ final int returnedLength = returnedBuffer.remaining();
- final ByteBuffer resultsName = resultsTree.get( 0 ).getName();
- if ( compareBuffer( start, resultsName ) ) {
+ final int compare = FBUtilities.compareUnsigned( start.array(), returnedBuffer.array(), startIndex, returnedIndex, startLength, returnedLength ) ;
+ //the byte buffers are the same as our seek (which may or may not be the case in the first seek)
+ if(compare == 0){
resultsTree.remove( 0 );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index 271fc67..fc2df4d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -160,7 +160,7 @@ public abstract class SearchVisitor implements NodeVisitor {
final int nodeId = node.getId();
- UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId, queryProcessor.getCursorCache( nodeId ) );
+ UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId );
if ( left != null ) {
union.addIterator( left );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
index d5d130e..2a359be 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/AbstractScanColumn.java
@@ -58,13 +58,13 @@ public abstract class AbstractScanColumn implements ScanColumn {
if ( this == o ) {
return true;
}
- if ( !( o instanceof AbstractScanColumn ) ) {
+ if ( !( o instanceof ScanColumn ) ) {
return false;
}
- AbstractScanColumn that = ( AbstractScanColumn ) o;
+ ScanColumn that = ( ScanColumn ) o;
- return uuid.equals( that.uuid );
+ return uuid.equals( that.getUUID() );
}
@@ -94,24 +94,4 @@ public abstract class AbstractScanColumn implements ScanColumn {
return child;
}
-
- /**
- * Comparator for comparing children. A null safe call
- * @param otherScanColumn
- * @return
- */
- protected int compareChildren( final ScanColumn otherScanColumn ) {
-
- if ( otherScanColumn == null ) {
- return 1;
- }
-
- final ScanColumn otherChild = otherScanColumn.getChild();
-
- if ( child != null && otherChild != null ) {
- return child.compareTo( otherChild );
- }
-
- return 0;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index 995d6c7..a386159 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.persistence.cassandra.CursorCache;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
import org.apache.usergrid.persistence.exceptions.QueryIterationException;
@@ -125,27 +126,11 @@ public class SliceIterator implements ResultIterator {
parsedCols.clear();
- boolean compared = false;
-
while ( results.hasNext() ) {
- ByteBuffer colName = results.next().getName().duplicate();
-
- ScanColumn parsed = parser.parse( colName, isReversed );
-
- //we might want to skip this slice, we're not sure, so we have to check them
- if(!compared){
- compared = true;
-
- if(slice.hasCursor()) {
- ScanColumn column = parser.parse( slice.getCursor(), isReversed );
+ final ByteBuffer colName = results.next().getName().duplicate();
- //this is our resume column, drop it
- if(column.getUUID().equals( parsed.getUUID() )){
- continue;
- }
- }
- }
+ final ScanColumn parsed = parser.parse( colName, isReversed );
//skip this value, the parser has discarded it
if ( parsed == null ) {
@@ -231,22 +216,22 @@ public class SliceIterator implements ResultIterator {
//this is a bug
if ( scanner.hasNext() ) {
logger.error(
- "An iterator attempted to access a slice that was not iterated over. This will result in the" +
- " cursor construction failing" );
+ "An iterator attempted to access a slice that was not iterated over. This will result in the"
+ + " cursor construction failing" );
throw new QueryIterationException(
- "An iterator attempted to access a slice that was not iterated over. This will result in the" +
- " cursor construction failing" );
+ "An iterator attempted to access a slice that was not iterated over. This will result in the"
+ + " cursor construction failing" );
}
final ByteBuffer sliceCursor = slice.getCursor();
//we've never loaded anything, just re-use the existing slice
- if (last == null && sliceCursor != null ) {
+ if ( last == null && sliceCursor != null ) {
bytes = sliceCursor;
}
//use the last column we loaded. This way our scan returns nothing next time since start == finish
- else if(last != null) {
+ else if ( last != null ) {
bytes = last.getCursorValue();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
index a5e09c1..640b391 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDColumn.java
@@ -4,53 +4,35 @@ package org.apache.usergrid.persistence.query.ir.result;
import java.nio.ByteBuffer;
import java.util.UUID;
+import org.apache.usergrid.persistence.cassandra.Serializers;
import org.apache.usergrid.utils.UUIDUtils;
/**
* Used as a comparator for columns
*/
-class UUIDColumn implements ScanColumn{
+class UUIDColumn extends AbstractScanColumn{
- private final UUID uuid;
private final int compareReversed;
- private ScanColumn child;
-
- UUIDColumn( final UUID uuid, final int compareReversed ) {
- this.uuid = uuid;
+ protected UUIDColumn( final UUID uuid, final ByteBuffer columnNameBuffer, final int compareReversed ) {
+ super( uuid, columnNameBuffer );
this.compareReversed = compareReversed;
}
- @Override
- public UUID getUUID() {
- return uuid;
- }
-
-
- @Override
- public ByteBuffer getCursorValue() {
- return null;
- }
-
-
- @Override
- public void setChild( final ScanColumn childColumn ) {
- this.child = childColumn;
+ public UUIDColumn( final UUID uuid, final int compareReversed ) {
+ super(uuid, Serializers.ue.toByteBuffer( uuid ));
+ this.compareReversed = compareReversed;
}
- @Override
- public ScanColumn getChild() {
- return child;
- }
@Override
public int compareTo( final ScanColumn other ) {
-
return UUIDUtils.compare( uuid, other.getUUID() ) * compareReversed;
-
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
index 91644ce..2c5b1ba 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UUIDIndexSliceParser.java
@@ -33,6 +33,6 @@ public class UUIDIndexSliceParser implements SliceParser {
@Override
public ScanColumn parse( final ByteBuffer columnNameBytes, final boolean isReversed ) {
final int compare = isReversed? -1: 1;
- return new UUIDColumn( ue.fromByteBuffer( columnNameBytes.duplicate() ), compare );
+ return new UUIDColumn( ue.fromByteBuffer( columnNameBytes.duplicate() ), columnNameBytes, compare );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
index 635ca97..aca9852 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -133,7 +133,7 @@ public class UnionIterator extends MultiIterator {
private final List<ScanColumn> list;
- private UUIDColumn min;
+// private UUIDColumn min;
public SortedColumnList( final int maxSize, final UUID minUuid ) {
@@ -141,9 +141,9 @@ public class UnionIterator extends MultiIterator {
this.list = new ArrayList<ScanColumn>( maxSize );
this.maxSize = maxSize;
- if ( minUuid != null ) {
- min = new UUIDColumn( minUuid, 1 ) ;
- }
+// if ( minUuid != null ) {
+// min = new UUIDColumn( minUuid, 1 ) ;
+// }
}
@@ -152,9 +152,9 @@ public class UnionIterator extends MultiIterator {
*/
public void add( ScanColumn col ) {
//less than our min, don't add
- if ( min != null && min.compareTo( col ) >= 0 ) {
- return;
- }
+// if ( min != null && min.compareTo( col ) >= 0 ) {
+// return;
+// }
int index = Collections.binarySearch( this.list, col );
@@ -215,7 +215,7 @@ public class UnionIterator extends MultiIterator {
}
final UUID oldMin = this.list.get( size - 1 ).getUUID();
- min = new UUIDColumn( oldMin, 1 );
+// min = new UUIDColumn( oldMin, 1 );
}
@@ -228,7 +228,7 @@ public class UnionIterator extends MultiIterator {
public void reset(){
clear();
- this.min = null;
+// this.min = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9489231e/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index ac58bd7..99fed21 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -1444,7 +1444,7 @@ public class CollectionIT extends AbstractCoreIT {
Query query = Query.fromQL( s );
Results r = em.searchCollection( em.getApplicationRef(), "loveobjects", query );
- assertTrue( r.size() == 1 );
+ assertEquals(1, r.size() );
String username = ( String ) ( ( Map ) r.getEntities().get( 0 ).getProperty( "Recipient" ) ).get( "Username" );
// selection results should be a list of lists
[3/3] incubator-usergrid git commit: Wraps connection all scanners
with shard filters to ensure we only process entities relevant to our shards
Posted by to...@apache.org.
Wraps connection all scanners with shard filters to ensure we only process entities relevant to our shards
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c55d261e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c55d261e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c55d261e
Branch: refs/heads/USERGRID-752
Commit: c55d261ecbeb46a87abe101df9ac0f3ff1b8a186
Parents: 9489231
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jun 24 13:20:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Jun 24 13:20:22 2015 -0600
----------------------------------------------------------------------
.../cassandra/RelationManagerImpl.java | 12 +-
.../cassandra/index/ConnectedIndexScanner.java | 11 +-
.../persistence/query/ir/SearchVisitor.java | 2 +-
.../result/ConnectionSearchVisitorFactory.java | 2 +-
.../query/ir/result/GatherIterator.java | 6 -
.../ir/result/SearchConnectionVisitor.java | 10 +-
.../query/ir/result/SliceIterator.java | 25 +++-
.../query/ir/result/SliceShardIterator.java | 118 +++++++++++++++++++
.../query/ir/result/UnionIterator.java | 94 ++++++++++++---
9 files changed, 236 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 977dad0..ebe2aec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -1910,10 +1910,6 @@ public class RelationManagerImpl implements RelationManager {
ConnectionRefImpl connectionRef =
new ConnectionRefImpl( sourceEntity, connectionType, new SimpleEntityRef( connectedEntityType, null ) );
- // EntityRef connectedEntity) {
- // ConnectionRefImpl connectionRef = new ConnectionRefImpl(new ConnectedEntityRefImpl(connectionType,
- // connectedEntityType, null, true), sourceEntity );
-
final ConnectionResultsLoaderFactory factory = new ConnectionResultsLoaderFactory( connectionRef );
@@ -1921,8 +1917,6 @@ public class RelationManagerImpl implements RelationManager {
ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, true, "" );
-// SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
-
return qp.getResults( collectionSearchVisitorFactory );
}
@@ -1967,8 +1961,6 @@ public class RelationManagerImpl implements RelationManager {
ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, "" );
-// SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, false );
-
return qp.getResults( collectionSearchVisitorFactory );
}
@@ -2007,9 +1999,7 @@ public class RelationManagerImpl implements RelationManager {
QueryProcessor qp = new QueryProcessor( query, null, em, factory );
- ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, "" );
-
-// SearchConnectionVisitor visitor = new SearchConnectionVisitor( this, qp, connectionRef, true );
+ ConnectionSearchVisitorFactory collectionSearchVisitorFactory = new ConnectionSearchVisitorFactory( cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, true, "" );
return qp.getResults( collectionSearchVisitorFactory );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index 6c6152a..4b1a049 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.cassandra.index;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -134,14 +135,16 @@ public class ConnectedIndexScanner implements IndexScanner {
//go through each connection type until we exhaust the result sets
while ( currentConnectionType != null ) {
+ final int lastResultsSize = lastResults == null? 0: lastResults.size();
+
//only load a delta size to get this next page
- int selectSize = totalSelectSize - lastResults.size();
+ final int selectSize = totalSelectSize - lastResultsSize;
- Object key = key( entityId, dictionaryType, currentConnectionType );
+ final Object key = key( entityId, dictionaryType, currentConnectionType );
- List<HColumn<ByteBuffer, ByteBuffer>> results =
+ final List<HColumn<ByteBuffer, ByteBuffer>> results =
cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_COMPOSITE_DICTIONARIES, key,
start, null, selectSize, reversed );
@@ -180,7 +183,7 @@ public class ConnectedIndexScanner implements IndexScanner {
if ( hasMore && lastResults !=null && lastResults.size() > 0 ) {
// set the bytebuffer for the next pass
- lastResults.remove( lastResults.size() - 1 );
+ start = lastResults.remove( lastResults.size() - 1 ).getName();
}
return lastResults != null && lastResults.size() > 0;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index fc2df4d..271fc67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@ -160,7 +160,7 @@ public abstract class SearchVisitor implements NodeVisitor {
final int nodeId = node.getId();
- UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId );
+ UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId, queryProcessor.getCursorCache( nodeId ) );
if ( left != null ) {
union.addIterator( left );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
index 54eeb01..6a340b9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionSearchVisitorFactory.java
@@ -41,7 +41,7 @@ public class ConnectionSearchVisitorFactory implements SearchVisitorFactory {
private final boolean outgoing;
private final String[] prefix;
-
+ //cass, indexBucketLocator, qp, applicationId, headEntity, connectionRef, false, ""
public ConnectionSearchVisitorFactory( final CassandraService cassandraService,
final IndexBucketLocator indexBucketLocator,
final QueryProcessor queryProcessor, final UUID applicationId,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
index 5bc91e0..eaae6b2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/GatherIterator.java
@@ -171,12 +171,6 @@ public class GatherIterator implements ResultIterator {
results.add(next);
cursorMap.put( next.getUUID(), iterator );
-// //results are too large, trim them
-// if(results.size() > pageSize){
-// final ScanColumn toRemove = results.pollLast();
-// cursorMap.remove( toRemove.getUUID() );
-// }
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
index 6f833db..ff08245 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java
@@ -53,7 +53,7 @@ public class SearchConnectionVisitor extends SearchVisitor {
* @param queryProcessor They query processor to use
* @param applicationId
* @param connection The connection refernce
- * @param outgoing The direction to search. True if we should search from source->target edges. False if we
+ * @param outgoing The direction to search. True if we should search from source->target edges. False if we are searching target<-source
*/
public SearchConnectionVisitor( final CassandraService cassandraService,
final IndexBucketLocator indexBucketLocator, final QueryProcessor queryProcessor,
@@ -120,6 +120,9 @@ public class SearchConnectionVisitor extends SearchVisitor {
@Override
public void visit( AllNode node ) throws Exception {
+
+ //todo, use a cache for this...
+
QuerySlice slice = node.getSlice();
queryProcessor.applyCursorAndSort( slice );
@@ -179,7 +182,10 @@ public class SearchConnectionVisitor extends SearchVisitor {
new ConnectedIndexScanner( cassandraService, dictionaryType, applicationId, entityIdToUse, connectionTypes,
start, slice.isReversed(), size, skipFirst );
- this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
+ //we have to create our wrapper so validate the data we read is correct for our shard
+ final SliceShardIterator.ShardBucketValidator validator = new SliceShardIterator.ShardBucketValidator(indexBucketLocator, bucket, applicationId, IndexBucketLocator.IndexType.CONNECTION, "" );
+
+ this.results.push( new SliceShardIterator( validator, slice, connectionScanner, connectionParser ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index a386159..4f5bff5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@ -46,10 +46,10 @@ public class SliceIterator implements ResultIterator {
private final LinkedHashMap<UUID, ScanColumn> cols;
private final QuerySlice slice;
- private final SliceParser parser;
- private final IndexScanner scanner;
+ protected final SliceParser parser;
+ protected final IndexScanner scanner;
private final int pageSize;
- private final boolean isReversed;
+ protected final boolean isReversed;
/**
@@ -128,9 +128,9 @@ public class SliceIterator implements ResultIterator {
while ( results.hasNext() ) {
- final ByteBuffer colName = results.next().getName().duplicate();
+ final HColumn<ByteBuffer, ByteBuffer> column = results.next();
- final ScanColumn parsed = parser.parse( colName, isReversed );
+ final ScanColumn parsed = parse(column);
//skip this value, the parser has discarded it
if ( parsed == null ) {
@@ -151,6 +151,21 @@ public class SliceIterator implements ResultIterator {
}
+ /**
+ * Parses the column. If the column should be discarded, null should be returned
+ * @param column
+ * @return
+ */
+ protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column){
+
+ final ByteBuffer colName = column.getName().duplicate();
+
+ final ScanColumn parsed = parser.parse( colName, isReversed );
+
+ return parsed;
+ }
+
+
/*
* (non-Javadoc)
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
new file mode 100644
index 0000000..279ba2c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceShardIterator.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+
+/**
+ * An iterator that will check if the parsed column is part of this shard. This is required due to a legacy storage
+ * format
+ *
+ * Connections are not sharded by target entity, as a result, we get data partition mismatches when performing
+ * intersections and seeks. This is meant to discard target entities that are not part of the current shard
+ *
+ * @author tnine
+ */
+public class SliceShardIterator extends SliceIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger( SliceShardIterator.class );
+
+ private final ShardBucketValidator shardBucketValidator;
+
+
+ /**
+ * @param slice The slice used in the scanner
+ * @param scanner The scanner to use to read the cols
+ * @param parser The parser for the scanner results
+ */
+ public SliceShardIterator( final ShardBucketValidator shardBucketValidator, final QuerySlice slice,
+ final IndexScanner scanner, final SliceParser parser ) {
+ super( slice, scanner, parser );
+
+ this.shardBucketValidator = shardBucketValidator;
+ }
+
+
+ /**
+ * Parses the column. If the column should be discarded, null should be returned
+ */
+ protected ScanColumn parse( HColumn<ByteBuffer, ByteBuffer> column ) {
+
+ final ByteBuffer colName = column.getName().duplicate();
+
+ final ScanColumn parsed = parser.parse( colName, isReversed );
+
+ if(parsed == null){
+ return null;
+ }
+
+ final UUID entityId = parsed.getUUID();
+
+
+ //not for our current processing shard, discard
+ if(!shardBucketValidator.isInShard( entityId )){
+ return null;
+ }
+
+ return parsed;
+ }
+
+
+ /**
+ * Class that performs validation on an entity to ensure it's in the shard we expecte
+ */
+ public static final class ShardBucketValidator {
+ private final IndexBucketLocator indexBucketLocator;
+ private final String expectedBucket;
+ private final UUID applicationId;
+ private final IndexBucketLocator.IndexType type;
+ private final String[] components;
+
+
+ public ShardBucketValidator( final IndexBucketLocator indexBucketLocator, final String expectedBucket,
+ final UUID applicationId, final IndexBucketLocator.IndexType type,
+ final String... components ) {
+ this.indexBucketLocator = indexBucketLocator;
+ this.expectedBucket = expectedBucket;
+ this.applicationId = applicationId;
+ this.type = type;
+ this.components = components;
+ }
+
+
+ public boolean isInShard( final UUID entityId ) {
+ //not for our current processing shard, discard
+ final String shard = indexBucketLocator.getBucket( applicationId, type, entityId, components );
+
+ return expectedBucket.equals( shard );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c55d261e/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
index aca9852..b6e44d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
@@ -28,6 +28,8 @@ import java.util.UUID;
import org.apache.usergrid.persistence.cassandra.CursorCache;
+import com.fasterxml.uuid.UUIDComparator;
+
import static org.apache.usergrid.persistence.cassandra.Serializers.*;
/**
@@ -131,19 +133,19 @@ public class UnionIterator extends MultiIterator {
private final int maxSize;
- private final List<ScanColumn> list;
+ private final List<UnionScanColumn> list;
-// private UUIDColumn min;
+ private UnionScanColumn min;
public SortedColumnList( final int maxSize, final UUID minUuid ) {
//we need to allocate the extra space if required
- this.list = new ArrayList<ScanColumn>( maxSize );
+ this.list = new ArrayList<UnionScanColumn>( maxSize );
this.maxSize = maxSize;
-// if ( minUuid != null ) {
-// min = new UUIDColumn( minUuid, 1 ) ;
-// }
+ if ( minUuid != null ) {
+ min = new UnionScanColumn(new UUIDColumn( minUuid, 1 )) ;
+ }
}
@@ -151,12 +153,15 @@ public class UnionIterator extends MultiIterator {
* Add the column to this list
*/
public void add( ScanColumn col ) {
+
+ final UnionScanColumn unionScanColumn = new UnionScanColumn(col);
+
//less than our min, don't add
-// if ( min != null && min.compareTo( col ) >= 0 ) {
-// return;
-// }
+ if ( min != null && min.compareTo( unionScanColumn ) >= 0 ) {
+ return;
+ }
- int index = Collections.binarySearch( this.list, col );
+ int index = Collections.binarySearch( this.list, unionScanColumn );
//already present
if ( index > -1 ) {
@@ -170,7 +175,7 @@ public class UnionIterator extends MultiIterator {
return;
}
- this.list.add( index, col );
+ this.list.add( index, unionScanColumn );
final int size = this.list.size();
@@ -214,8 +219,7 @@ public class UnionIterator extends MultiIterator {
return;
}
- final UUID oldMin = this.list.get( size - 1 ).getUUID();
-// min = new UUIDColumn( oldMin, 1 );
+ min = this.list.get( size - 1 );
}
@@ -228,7 +232,69 @@ public class UnionIterator extends MultiIterator {
public void reset(){
clear();
-// this.min = null;
+ this.min = null;
+ }
+ }
+
+ private static class UnionScanColumn implements ScanColumn{
+
+ private final ScanColumn delegate;
+ private ScanColumn child;
+
+
+ private UnionScanColumn( final ScanColumn delegate ) {
+ super();
+ this.delegate = delegate;}
+
+
+ @Override
+ public int compareTo( final ScanColumn o ) {
+ return UUIDComparator.staticCompare( delegate.getUUID(), o.getUUID() );
+ }
+
+
+ @Override
+ public UUID getUUID() {
+ return delegate.getUUID();
+ }
+
+
+ @Override
+ public ByteBuffer getCursorValue() {
+ return ue.toByteBuffer( delegate.getUUID() );
+ }
+
+
+ @Override
+ public void setChild( final ScanColumn childColumn ) {
+ //intentionally a no-op, since child is on the delegate
+ }
+
+
+ @Override
+ public ScanColumn getChild() {
+ return delegate.getChild();
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( !( o instanceof UnionScanColumn ) ) {
+ return false;
+ }
+
+ final UnionScanColumn that = ( UnionScanColumn ) o;
+
+ return delegate.getUUID().equals( that.delegate.getUUID() );
+ }
+
+
+ @Override
+ public int hashCode() {
+ return delegate.getUUID().hashCode();
}
}
}