You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/30 16:21:07 UTC
[31/34] update to master
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
index 8a2d3d3,0000000..181430f
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/ConnectedIndexScanner.java
@@@ -1,231 -1,0 +1,280 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.springframework.util.Assert;
+
+import com.yammer.metrics.annotation.Metered;
+
+import me.prettyprint.hector.api.beans.HColumn;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+
+
- /** @author tnine */
++/**
++ * @author tnine
++ */
+public class ConnectedIndexScanner implements IndexScanner {
+
+ private final CassandraService cass;
+ private final UUID applicationId;
+ private final boolean reversed;
+ private final int pageSize;
+ private final String dictionaryType;
+ private final UUID entityId;
+ private final Iterator<String> connectionTypes;
++ private final boolean skipFirst;
+
- /** Pointer to our next start read */
++
++ /**
++ * Pointer to our next start read
++ */
+ private ByteBuffer start;
+
- /** Set to the original value to start scanning from */
++ /**
++ * Set to the original value to start scanning from
++ */
+ private ByteBuffer scanStart;
+
- /** Iterator for our results from the last page load */
++ /**
++ * Iterator for our results from the last page load
++ */
+ private LinkedHashSet<HColumn<ByteBuffer, ByteBuffer>> lastResults;
+
- /** True if our last load loaded a full page size. */
++ /**
++ * True if our last load loaded a full page size.
++ */
+ private boolean hasMore = true;
+
+ private String currentConnectionType;
+
+
+ public ConnectedIndexScanner( CassandraService cass, String dictionaryType, UUID applicationId, UUID entityId,
- Iterator<String> connectionTypes, ByteBuffer start, boolean reversed, int pageSize ) {
++ Iterator<String> connectionTypes, ByteBuffer start, boolean reversed, int pageSize,
++ boolean skipFirst ) {
+
+ Assert.notNull( entityId, "Entity id for row key construction must be specified when searching graph indexes" );
+ // create our start and end ranges
+ this.scanStart = start;
+ this.cass = cass;
+ this.applicationId = applicationId;
+ this.entityId = entityId;
+ this.start = scanStart;
+ this.reversed = reversed;
+ this.pageSize = pageSize;
+ this.dictionaryType = dictionaryType;
+ this.connectionTypes = connectionTypes;
++ this.skipFirst = skipFirst;
+
+
+ if ( connectionTypes.hasNext() ) {
+ currentConnectionType = connectionTypes.next();
+ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.cassandra.index.IndexScanner#reset()
+ */
+ @Override
+ public void reset() {
+ hasMore = true;
+ start = scanStart;
+ }
+
+
+ /**
+ * Search the collection index using all the buckets for the given collection. Load the next page. Return false if
+ * nothing was loaded, true otherwise
+ */
+
+ public boolean load() throws Exception {
+
+ // nothing left to load
+ if ( !hasMore ) {
+ return false;
+ }
+
++ boolean skipFirst = this.skipFirst && start == scanStart;
++
++ int totalSelectSize = pageSize + 1;
++
++ //we're discarding the first, so increase our total size by 1 since this value will be inclusive in the seek
++ if ( skipFirst ) {
++ totalSelectSize++;
++ }
++
+
+ lastResults = new LinkedHashSet<HColumn<ByteBuffer, ByteBuffer>>();
+
++
++ //cleanup columns for later logic
++ //pointer to the first col we load
++ HColumn<ByteBuffer, ByteBuffer> first = null;
++
++ //pointer to the last column we load
++ HColumn<ByteBuffer, ByteBuffer> last = null;
++
+ //go through each connection type until we exhaust the result sets
+ while ( currentConnectionType != null ) {
+
+ //only load a delta size to get this next page
- int selectSize = pageSize + 1 - lastResults.size();
++ int selectSize = totalSelectSize - lastResults.size();
++
+
+ Object key = key( entityId, dictionaryType, currentConnectionType );
+
+
+ List<HColumn<ByteBuffer, ByteBuffer>> results =
+ cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_COMPOSITE_DICTIONARIES, key,
+ start, null, selectSize, reversed );
+
++ final int resultSize = results.size();
++
++ if(resultSize > 0){
++
++ last = results.get( resultSize -1 );
++
++ if(first == null ){
++ first = results.get( 0 );
++ }
++ }
++
+ lastResults.addAll( results );
+
++
+ // we loaded a full page, there might be more
- if ( results.size() == selectSize ) {
++ if ( resultSize == selectSize ) {
+ hasMore = true;
+
- // set the bytebuffer for the next pass
- start = results.get( results.size() - 1 ).getName();
-
- lastResults.remove( lastResults.size() - 1 );
-
+ //we've loaded a full page
+ break;
+ }
+ else {
+
+ //we're done, there's no more connection types and we've loaded all cols for this type.
+ if ( !connectionTypes.hasNext() ) {
+ hasMore = false;
+ currentConnectionType = null;
+ break;
+ }
+
+ //we have more connection types, but we've reached the end of this type,
+ // keep going in the loop to load the next page
+
+ currentConnectionType = connectionTypes.next();
+ }
+ }
+
++ //remove the first element, we need to skip it
++ if ( skipFirst && first != null) {
++ lastResults.remove( first );
++ }
++
++ if ( hasMore && last != null ) {
++ // set the bytebuffer for the next pass
++ start = last.getName();
++ lastResults.remove( last );
++ }
+
+ return lastResults != null && lastResults.size() > 0;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator<Set<HColumn<ByteBuffer, ByteBuffer>>> iterator() {
+ return this;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext() {
+
+ // We've either 1) paged everything we should and have 1 left from our
+ // "next page" pointer
+ // Our currently buffered results don't exist or don't have a next. Try to
+ // load them again if they're less than the page size
+ if ( lastResults == null && hasMore ) {
+ try {
+ return load();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Error loading next page of indexbucket scanner", e );
+ }
+ }
+
+ return false;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#next()
+ */
+ @Override
- @Metered(group = "core", name = "IndexBucketScanner_load")
++ @Metered( group = "core", name = "IndexBucketScanner_load" )
+ public Set<HColumn<ByteBuffer, ByteBuffer>> next() {
+ Set<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
+
+ lastResults = null;
+
+ return returnVal;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#remove()
+ */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "You can't remove from a result set, only advance" );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.cassandra.index.IndexScanner#getPageSize()
+ */
+ @Override
+ public int getPageSize() {
+ return pageSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
index 9250d0d,0000000..0cd6eb8
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/index/IndexBucketScanner.java
@@@ -1,217 -1,0 +1,238 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra.index;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.usergrid.persistence.cassandra.ApplicationCF;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+
+import com.yammer.metrics.annotation.Metered;
+
+import me.prettyprint.hector.api.beans.HColumn;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+
+
+/**
+ * A simple class to make working with index buckets easier. Scans all buckets and merges the results into a single
+ * column list to allow easy backwards compatibility with existing code
+ *
+ * @author tnine
+ */
+public class IndexBucketScanner implements IndexScanner {
+
+ private final CassandraService cass;
+ private final IndexBucketLocator indexBucketLocator;
+ private final UUID applicationId;
+ private final Object keyPrefix;
+ private final ApplicationCF columnFamily;
+ private final Object finish;
+ private final boolean reversed;
+ private final int pageSize;
+ private final String[] indexPath;
+ private final IndexType indexType;
++ private final boolean skipFirst;
+
+ /** Pointer to our next start read */
+ private Object start;
+
+ /** Set to the original value to start scanning from */
+ private Object scanStart;
+
+ /** Iterator for our results from the last page load */
+ private TreeSet<HColumn<ByteBuffer, ByteBuffer>> lastResults;
+
+ /** True if our last load loaded a full page size. */
+ private boolean hasMore = true;
+
+
++
+ public IndexBucketScanner( CassandraService cass, IndexBucketLocator locator, ApplicationCF columnFamily,
+ UUID applicationId, IndexType indexType, Object keyPrefix, Object start, Object finish,
- boolean reversed, int pageSize, String... indexPath ) {
++ boolean reversed, int pageSize, boolean skipFirst, String... indexPath) {
+ this.cass = cass;
+ this.indexBucketLocator = locator;
+ this.applicationId = applicationId;
+ this.keyPrefix = keyPrefix;
+ this.columnFamily = columnFamily;
+ this.start = start;
+ this.finish = finish;
+ this.reversed = reversed;
- this.pageSize = pageSize;
++ this.skipFirst = skipFirst;
++
++ //we always add 1 to the page size. This is because we pop the last column for the next page of results
++ this.pageSize = pageSize+1;
+ this.indexPath = indexPath;
+ this.indexType = indexType;
+ this.scanStart = start;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.cassandra.index.IndexScanner#reset()
+ */
+ @Override
+ public void reset() {
+ hasMore = true;
+ start = scanStart;
+ }
+
+
+ /**
+ * Search the collection index using all the buckets for the given collection. Load the next page. Return false if
+ * nothing was loaded, true otherwise
+ *
+ * @return True if the data could be loaded
+ */
+
+ public boolean load() throws Exception {
+
+ // nothing left to load
+ if ( !hasMore ) {
+ return false;
+ }
+
+ List<String> keys = indexBucketLocator.getBuckets( applicationId, indexType, indexPath );
+
+ List<Object> cassKeys = new ArrayList<Object>( keys.size() );
+
+ for ( String bucket : keys ) {
+ cassKeys.add( key( keyPrefix, bucket ) );
+ }
+
+ //if we skip the first we need to set the load to page size +2, since we'll discard the first
+ //and start paging at the next entity, otherwise we'll just load the page size we need
- int selectSize = pageSize + 1;
++ int selectSize = pageSize;
++
++ //we purposefully use instance equality. If it's a pointer to the same value, we need to increase by 1
++ //since we'll be skipping the first value
++
++ final boolean firstPageSkipFirst = this.skipFirst && start == scanStart;
++
++ if(firstPageSkipFirst){
++ selectSize++;
++ }
+
+ TreeSet<HColumn<ByteBuffer, ByteBuffer>> resultsTree = IndexMultiBucketSetLoader
+ .load( cass, columnFamily, applicationId, cassKeys, start, finish, selectSize, reversed );
+
++ //remove the first element, it's from a cursor value and we don't want to retain it
++
++
+ // we loaded a full page, there might be more
+ if ( resultsTree.size() == selectSize ) {
+ hasMore = true;
+
- // set the bytebuffer for the next pass
- start = resultsTree.last().getName();
+
- resultsTree.remove( resultsTree.last() );
++ // set the bytebuffer for the next pass
++ start = resultsTree.pollLast().getName();
+ }
+ else {
+ hasMore = false;
+ }
+
++ //remove the first element since it needs to be skipped AFTER the size check. Otherwise it will fail
++ if ( firstPageSkipFirst ) {
++ resultsTree.pollFirst();
++ }
++
+ lastResults = resultsTree;
+
+ return lastResults != null && lastResults.size() > 0;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator<Set<HColumn<ByteBuffer, ByteBuffer>>> iterator() {
+ return this;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext() {
+
+ // We've either 1) paged everything we should and have 1 left from our
+ // "next page" pointer
+ // Our currently buffered results don't exist or don't have a next. Try to
+ // load them again if they're less than the page size
+ if ( lastResults == null && hasMore ) {
+ try {
+ return load();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Error loading next page of indexbucket scanner", e );
+ }
+ }
+
+ return false;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ @Metered(group = "core", name = "IndexBucketScanner_load")
+ public NavigableSet<HColumn<ByteBuffer, ByteBuffer>> next() {
+ NavigableSet<HColumn<ByteBuffer, ByteBuffer>> returnVal = lastResults;
+
+ lastResults = null;
+
+ return returnVal;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#remove()
+ */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "You can't remove from a result set, only advance" );
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.cassandra.index.IndexScanner#getPageSize()
+ */
+ @Override
+ public int getPageSize() {
+ return pageSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
index 885e055,0000000..fb11c06
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/SearchVisitor.java
@@@ -1,255 -1,0 +1,254 @@@
+package org.apache.usergrid.persistence.query.ir;
+
+
+import java.util.Stack;
+
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.cassandra.QueryProcessor;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
+import org.apache.usergrid.persistence.query.ir.result.IntersectionIterator;
+import org.apache.usergrid.persistence.query.ir.result.OrderByIterator;
+import org.apache.usergrid.persistence.query.ir.result.ResultIterator;
+import org.apache.usergrid.persistence.query.ir.result.SecondaryIndexSliceParser;
+import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
+import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
+import org.apache.usergrid.persistence.query.ir.result.SubtractionIterator;
+import org.apache.usergrid.persistence.query.ir.result.UnionIterator;
+
+
+/**
+ * Simple search visitor that performs all the joining in memory for results.
+ * <p/>
+ * Subclasses will want to implement visiting SliceNode and WithinNode to actually perform the search on the Cassandra
+ * indexes. This class can perform joins on all index entries that conform to the Results object
+ *
+ * @author tnine
+ */
+public abstract class SearchVisitor implements NodeVisitor {
+
+ private static final SecondaryIndexSliceParser COLLECTION_PARSER = new SecondaryIndexSliceParser();
+
+ protected final Query query;
+
+ protected final QueryProcessor queryProcessor;
+
+ protected final EntityManager em;
+
+ protected final Stack<ResultIterator> results = new Stack<ResultIterator>();
+
+
+ /**
+ * @param queryProcessor
+ */
+ public SearchVisitor( QueryProcessor queryProcessor ) {
+ this.query = queryProcessor.getQuery();
+ this.queryProcessor = queryProcessor;
+ this.em = queryProcessor.getEntityManager();
+ }
+
+
+ /** Return the results if they exist, null otherwise */
+ public ResultIterator getResults() {
+ return results.isEmpty() ? null : results.pop();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.AndNode)
+ */
+ @Override
+ public void visit( AndNode node ) throws Exception {
+ node.getLeft().visit( this );
+ node.getRight().visit( this );
+
+ ResultIterator right = results.pop();
+ ResultIterator left = results.pop();
+
+ /**
+ * NOTE: TN We should always maintain post order traversal of the tree. It
+ * is required for sorting to work correctly
+ */
+ IntersectionIterator intersection = new IntersectionIterator( queryProcessor.getPageSizeHint( node ) );
+ intersection.addIterator( left );
+ intersection.addIterator( right );
+
+ results.push( intersection );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.NotNode)
+ */
+ @Override
+ public void visit( NotNode node ) throws Exception {
+ node.getSubtractNode().visit( this );
+ ResultIterator not = results.pop();
+
+ node.getKeepNode().visit( this );
+ ResultIterator keep = results.pop();
+
+ SubtractionIterator subtraction = new SubtractionIterator( queryProcessor.getPageSizeHint( node ) );
+ subtraction.setSubtractIterator( not );
+ subtraction.setKeepIterator( keep );
+
+ results.push( subtraction );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
+ * persistence.query.ir.OrNode)
+ */
+ @Override
+ public void visit( OrNode node ) throws Exception {
+ node.getLeft().visit( this );
+ node.getRight().visit( this );
+
+ ResultIterator right = results.pop();
+ ResultIterator left = results.pop();
+
+ final int nodeId = node.getId();
+
+ UnionIterator union = new UnionIterator( queryProcessor.getPageSizeHint( node ), nodeId, queryProcessor.getCursorCache(nodeId ) );
+
+ if ( left != null ) {
+ union.addIterator( left );
+ }
+ if ( right != null ) {
+ union.addIterator( right );
+ }
+
+ results.push( union );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.persistence
+ * .query.ir.OrderByNode)
+ */
+ @Override
+ public void visit( OrderByNode orderByNode ) throws Exception {
+
+ QuerySlice slice = orderByNode.getFirstPredicate().getAllSlices().iterator().next();
+
+ queryProcessor.applyCursorAndSort( slice );
+
+ QueryNode subOperations = orderByNode.getQueryOperations();
+
+ ResultIterator subResults = null;
+
+ if ( subOperations != null ) {
+ //visit our sub operation
+ subOperations.visit( this );
+
+ subResults = results.pop();
+ }
+
+ ResultIterator orderIterator;
+
+ /**
+ * We have secondary sorts, we need to evaluate the candidate results and sort them in memory
+ */
+ if ( orderByNode.hasSecondarySorts() ) {
+
+ //only order by with no query, start scanning the first field
+ if ( subResults == null ) {
+ QuerySlice firstFieldSlice = new QuerySlice( slice.getPropertyName(), -1 );
+ subResults =
- new SliceIterator( slice, secondaryIndexScan( orderByNode, firstFieldSlice ), COLLECTION_PARSER,
- slice.hasCursor() );
++ new SliceIterator( slice, secondaryIndexScan( orderByNode, firstFieldSlice ), COLLECTION_PARSER );
+ }
+
+ orderIterator = new OrderByIterator( slice, orderByNode.getSecondarySorts(), subResults, em,
+ queryProcessor.getPageSizeHint( orderByNode ) );
+ }
+
+ //we don't have multi field sorting, we can simply do intersection with a single scan range
+ else {
+
+ IndexScanner scanner;
+
+ if ( slice.isComplete() ) {
+ scanner = new NoOpIndexScanner();
+ }
+ else {
+ scanner = secondaryIndexScan( orderByNode, slice );
+ }
+
- SliceIterator joinSlice = new SliceIterator( slice, scanner, COLLECTION_PARSER, slice.hasCursor() );
++ SliceIterator joinSlice = new SliceIterator( slice, scanner, COLLECTION_PARSER);
+
+ IntersectionIterator union = new IntersectionIterator( queryProcessor.getPageSizeHint( orderByNode ) );
+ union.addIterator( joinSlice );
+
+ if ( subResults != null ) {
+ union.addIterator( subResults );
+ }
+
+ orderIterator = union;
+ }
+
+ // now create our intermediate iterator with our real results
+ results.push( orderIterator );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.persistence
+ * .query.ir.SliceNode)
+ */
+ @Override
+ public void visit( SliceNode node ) throws Exception {
+ IntersectionIterator intersections = new IntersectionIterator( queryProcessor.getPageSizeHint( node ) );
+
+ for ( QuerySlice slice : node.getAllSlices() ) {
+ IndexScanner scanner = secondaryIndexScan( node, slice );
+
- intersections.addIterator( new SliceIterator( slice, scanner, COLLECTION_PARSER, slice.hasCursor() ) );
++ intersections.addIterator( new SliceIterator( slice, scanner, COLLECTION_PARSER) );
+ }
+
+ results.push( intersections );
+ }
+
+
+ /**
+ * Create a secondary index scan for the given slice node. DOES NOT apply to the "all" case. This should only
+ * generate a slice for secondary property scanning
+ */
+ protected abstract IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception;
+
+
+ @Override
+ public void visit( UuidIdentifierNode uuidIdentifierNode ) {
+ this.results.push( new StaticIdIterator( uuidIdentifierNode.getUuid() ) );
+ }
+
+
+ @Override
+ public void visit( EmailIdentifierNode emailIdentifierNode ) throws Exception {
+ EntityRef user = queryProcessor.getEntityManager().getUserByIdentifier( emailIdentifierNode.getIdentifier() );
+
+ if ( user == null ) {
+ this.results.push( new EmptyIterator() );
+ return;
+ }
+
+ this.results.push( new StaticIdIterator( user.getUuid() ) );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIterator.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIterator.java
index 6b463ef,0000000..4a2adfa
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIterator.java
@@@ -1,167 -1,0 +1,169 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.cassandra.CursorCache;
+
+import com.google.common.collect.Sets;
+
+
+/**
+ * An iterator that unions 1 or more subsets. It makes the assuming that sub iterators iterate from min(uuid) to
+ * max(uuid)
+ *
+ * @author tnine
+ */
+public class IntersectionIterator extends MultiIterator {
+
+
+ /**
+ *
+ */
+ public IntersectionIterator( int pageSize ) {
+ super( pageSize );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.result.ResultIterator#reset()
+ */
+ @Override
+ public void doReset() {
+ for ( ResultIterator itr : iterators ) {
+ itr.reset();
+ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.result.MergeIterator#advance()
+ */
+ @Override
+ protected Set<ScanColumn> advance() {
+ /**
+ * Advance our sub iterators until the UUID's all line up
+ */
+
+ int size = iterators.size();
+
+ if ( size == 0 ) {
+ return null;
+ }
+
+ // edge case with only 1 iterator
+ if ( size == 1 ) {
+
+ ResultIterator itr = iterators.get( 0 );
+
+ if ( !itr.hasNext() ) {
+ return null;
+ }
+
+ return itr.next();
+ }
+
+ // begin our tree merge of the iterators
+
+ return merge();
+ }
+
+
+ private Set<ScanColumn> merge() {
+
+ Set<ScanColumn> results = new LinkedHashSet<ScanColumn>();
+ ResultIterator rootIterator = iterators.get( 0 );
+
+
+ //we've matched to the end
+ if ( !rootIterator.hasNext() ) {
+ return null;
+ }
+
+
+ //purposely check size first, that way we avoid another round trip if we can
+ while ( results.size() < pageSize && rootIterator.hasNext() ) {
+
+ Set<ScanColumn> intersection = rootIterator.next();
+
+ for ( int i = 1; i < iterators.size(); i++ ) {
+
+ ResultIterator joinIterator = iterators.get( i );
+
+ intersection = merge( intersection, joinIterator );
+
+ //nothing left short circuit, there is no point in advancing to further join iterators
+ if ( intersection.size() == 0 ) {
+ break;
+ }
+ }
+
+ //now add the intermediate results and continue
+ results.addAll( intersection );
+ }
+
+ return results;
+ }
+
+
+ private Set<ScanColumn> merge( Set<ScanColumn> current, ResultIterator child ) {
+
+ Set<ScanColumn> results = new LinkedHashSet<ScanColumn>( pageSize );
+
+ while ( results.size() < pageSize ) {
+ if ( !child.hasNext() ) {
+ // we've iterated to the end, reset for next pass
+ child.reset();
+ return results;
+ }
+
+
+ final Set<ScanColumn> childResults = child.next();
+
- results.addAll( Sets.intersection( current, childResults ) );
++ final Set<ScanColumn> intersection = Sets.intersection( current, childResults );
++
++ results.addAll( intersection );
+ }
+
+ return results;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.ir.result.ResultIterator#finalizeCursor(
+ * org.apache.usergrid.persistence.cassandra.CursorCache)
+ */
+ @Override
+ public void finalizeCursor( CursorCache cache, UUID lastLoaded ) {
+ ResultIterator itr = iterators.get( 0 );
+
+ //We can only create a cursor on our root level value in the intersection iterator.
+ if ( itr != null ) {
+ itr.finalizeCursor( cache, lastLoaded );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/MergeIterator.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/MergeIterator.java
index a349f44,0000000..772cac2
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/MergeIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/MergeIterator.java
@@@ -1,137 -1,0 +1,149 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.Iterator;
+import java.util.Set;
+
+
+/** @author tnine */
+public abstract class MergeIterator implements ResultIterator {
+
+
+ /** kept private on purpose so advance must return the correct value */
+ private Set<ScanColumn> next;
+
+ /** Pointer to the last set. Equal to "next" when returned. Used to retain results after "next" is set to null */
+ private Set<ScanColumn> last;
+ /** The size of the pages */
+ protected int pageSize;
+
+ int loadCount = 0;
+
+
+ /**
+ *
+ */
+ public MergeIterator( int pageSize ) {
+ this.pageSize = pageSize;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator<Set<ScanColumn>> iterator() {
+ return this;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext() {
+ //if next isn't set, try to advance
- if ( next == null ) {
- doAdvance();
++ if(checkNext()){
++ return true;
+ }
+
- boolean results = next != null && next.size() > 0;
+
- if ( results ) {
- last = next;
- loadCount++;
- }
++ doAdvance();
++
++
++ return checkNext();
++ }
++
+
- return results;
++ /**
++ * Single source of logic to check if a next is present.
++ * @return
++ */
++ protected boolean checkNext(){
++ return next != null && next.size() > 0;
+ }
+
+
+ /** Advance to the next page */
+ protected void doAdvance() {
+ next = advance();
++
++
++ if ( next != null && next.size() > 0 ) {
++ last = next;
++ loadCount++;
++ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public Set<ScanColumn> next() {
+ if ( next == null ) {
+ doAdvance();
+ }
+
+ Set<ScanColumn> returnVal = next;
+
+ next = null;
+
+ return returnVal;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#remove()
+ */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "You can't remove from a union iterator" );
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.usergrid.persistence.query.ir.result.ResultIterator#reset()
+ */
+ @Override
+ public void reset() {
+ if ( loadCount == 1 && last != null ) {
+ next = last;
+ return;
+ }
+ //clean up the last pointer
+ last = null;
+ //reset in the child iterators
+ doReset();
+ }
+
+
+ /** Advance the iterator to the next value. Can return an empty set with signals no values */
+ protected abstract Set<ScanColumn> advance();
+
+ /** Perform the reset if required */
+ protected abstract void doReset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
index 3829ad8,0000000..54bfe5e
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SliceIterator.java
@@@ -1,251 -1,0 +1,241 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.cassandra.CursorCache;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.exceptions.QueryIterationException;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+
+/**
+ * An iterator that will take all slices and order them correctly
+ *
+ * @author tnine
+ */
+public class SliceIterator implements ResultIterator {
+
+ private static final Logger logger = LoggerFactory.getLogger( SliceIterator.class );
+
+ private final LinkedHashMap<UUID, ScanColumn> cols;
+ private final QuerySlice slice;
+ private final SliceParser parser;
+ private final IndexScanner scanner;
+ private final int pageSize;
- private final boolean skipFirst;
+
+ /**
+ * Pointer to the uuid set until it's returned
+ */
+ private Set<ScanColumn> lastResult;
+
+ /**
+ * The pointer to the last set of parsed columns
+ */
+ private Set<ScanColumn> parsedCols;
+
+ /**
+ * counter that's incremented as we load pages. If pages loaded = 1 when reset, we don't have to reload from cass
+ */
+ private int pagesLoaded = 0;
+
+ /**
+ * Pointer to the last column we parsed
+ */
+ private ScanColumn last;
+
+
+ /**
+ * @param scanner The scanner to use to read the cols
+ * @param slice The slice used in the scanner
+ * @param parser The parser for the scanner results
- * @param skipFirst True if the first record should be skipped, used with cursors
+ */
- public SliceIterator( QuerySlice slice, IndexScanner scanner, SliceParser parser, boolean skipFirst ) {
++ public SliceIterator( QuerySlice slice, IndexScanner scanner, SliceParser parser ) {
+ this.slice = slice;
+ this.parser = parser;
+ this.scanner = scanner;
- this.skipFirst = skipFirst;
+ this.pageSize = scanner.getPageSize();
+ this.cols = new LinkedHashMap<UUID, ScanColumn>( this.pageSize );
+ this.parsedCols = new LinkedHashSet<ScanColumn>( this.pageSize );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator<Set<ScanColumn>> iterator() {
+ return this;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext() {
+ if ( lastResult == null ) {
+ return load();
+ }
+
+ return true;
+ }
+
+
+ private boolean load() {
+ if ( !scanner.hasNext() ) {
+ return false;
+ }
+
+ Iterator<HColumn<ByteBuffer, ByteBuffer>> results = scanner.next().iterator();
+
+ cols.clear();
+
- /**
- * Skip the first value, it's from the previous cursor
- */
- if ( skipFirst && pagesLoaded == 0 && results.hasNext() ) {
- results.next();
- }
-
+ parsedCols.clear();
+
+ while ( results.hasNext() ) {
+
+ ByteBuffer colName = results.next().getName().duplicate();
+
+ ScanColumn parsed = parser.parse( colName );
+
+ //skip this value, the parser has discarded it
+ if ( parsed == null ) {
+ continue;
+ }
+
+ last = parsed;
+ cols.put( parsed.getUUID(), parsed );
+ parsedCols.add( parsed );
+ }
+
+
+ pagesLoaded++;
+
+ lastResult = parsedCols;
+
+ return lastResult != null && lastResult.size() > 0;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public Set<ScanColumn> next() {
+ Set<ScanColumn> temp = lastResult;
+ lastResult = null;
+ return temp;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#remove()
+ */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is not supported" );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.result.ResultIterator#reset()
+ */
+ @Override
+ public void reset() {
+ // Do nothing, we'll just return the first page again
+ if ( pagesLoaded == 1 ) {
+ lastResult = parsedCols;
+ return;
+ }
+ scanner.reset();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.ir.result.ResultIterator#finalizeCursor()
+ */
+ @Override
+ public void finalizeCursor( CursorCache cache, UUID lastLoaded ) {
+ final int sliceHash = slice.hashCode();
+
+ ByteBuffer bytes = null;
+
+ ScanColumn col = cols.get( lastLoaded );
+
+
+ //the column came from the current page
+ if ( col != null ) {
+ bytes = col.getCursorValue();
+ }
+ else {
+
+ //check if we reached the end of our iterator. If we did, set the last value into the cursor. Otherwise
+ //this is a bug
+ if ( scanner.hasNext() ) {
+ logger.error(
+ "An iterator attempted to access a slice that was not iterated over. This will result in the" +
+ " cursor construction failing" );
+ throw new QueryIterationException(
+ "An iterator attempted to access a slice that was not iterated over. This will result in the" +
+ " cursor construction failing" );
+ }
+
+ final ByteBuffer sliceCursor = slice.getCursor();
+
+ //we've never loaded anything, just re-use the existing slice
+ if (last == null && sliceCursor != null ) {
+ bytes = sliceCursor;
+ }
+
+ //use the last column we loaded. This way our scan returns nothing next time since start == finish
+ else if(last != null) {
+ bytes = last.getCursorValue();
+ }
+ }
+
+
+ if ( bytes == null ) {
+ return;
+ }
+
+ cache.setNextCursor( sliceHash, bytes );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
index 9630eda,0000000..ff80e6d
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/UnionIterator.java
@@@ -1,252 -1,0 +1,267 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.cassandra.CursorCache;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+
+
+/**
+ * Simple iterator to perform Unions
+ *
+ * @author tnine
+ */
+public class UnionIterator extends MultiIterator {
+
+ private static final ScanColumnComparator COMP = new ScanColumnComparator();
+
+ private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
+
+
+ private SortedColumnList list;
+
+ private final int id;
+
+
+ /**
+ * @param pageSize The page size to return
+ * @param id The id assigned to this node
+ * @param minUuid The minimum UUID to return
+ */
+ public UnionIterator( int pageSize, int id, ByteBuffer minUuid ) {
+ super( pageSize );
+
+ this.id = id;
+
+ UUID parseMinUuid = null;
+
+ if(minUuid != null) {
+ parseMinUuid = UUID_SERIALIZER.fromByteBuffer( minUuid );
+ }
+
+ list = new SortedColumnList( pageSize, parseMinUuid );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.ir.result.MergeIterator#advance()
+ */
+ @Override
+ protected Set<ScanColumn> advance() {
+
+ int size = iterators.size();
+
+ if ( size == 0 ) {
+ return null;
+ }
+
+
+ list.clear();
+
+ for ( ResultIterator itr : iterators ) {
+
+ while ( itr.hasNext() ) {
+ list.addAll( itr.next() );
+ }
+
+ itr.reset();
+ }
+
+ //mark us for the next page
+ list.mark();
+
+
+ return list.asSet();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.ir.result.ResultIterator#finalizeCursor(
+ * org.apache.usergrid.persistence.cassandra.CursorCache)
+ */
+ @Override
+ public void finalizeCursor( CursorCache cache, UUID lastLoaded ) {
+
+ ByteBuffer buff = UUIDSerializer.get().toByteBuffer( lastLoaded );
+ cache.setNextCursor( id, buff );
+ //get our scan column and put them in the cache
+ //we finalize the cursor of the min
+ }
+
+
++ @Override
++ public void doReset() {
++ //reset sub iterators if we need to
++ super.doReset();
++
++ list.reset();
++
++ }
++
++
+ /**
+ * A Sorted Set with a max size. When a new entry is added, the max is removed. You can mark the next "min" by
+ * calling the mark method. Values > min are accepted. Values > min and that are over size are discarded
+ */
+ public static final class SortedColumnList {
+
+ private static final ScanColumnComparator COMP = new ScanColumnComparator();
+
+ private final int maxSize;
+
+ private final List<ScanColumn> list;
+
+
+ private ScanColumn min;
+
+
+ public SortedColumnList( final int maxSize, final UUID minUuid ) {
+ //we need to allocate the extra space if required
+ this.list = new ArrayList<ScanColumn>( maxSize );
+ this.maxSize = maxSize;
+
+ if ( minUuid != null ) {
+ min = new AbstractScanColumn( minUuid, null ) {};
+ }
+ }
+
+
+ /**
+ * Add the column to this list
+ */
+ public void add( ScanColumn col ) {
+ //less than our min, don't add
+ if ( COMP.compare( min, col ) >= 0 ) {
+ return;
+ }
+
+ int index = Collections.binarySearch( this.list, col, COMP );
+
+ //already present
+ if ( index > -1 ) {
+ return;
+ }
+
+ index = ( index * -1 ) - 1;
+
- //outside the renage
++ //outside the range
+ if ( index >= maxSize ) {
+ return;
+ }
+
+ this.list.add( index, col );
+
+ final int size = this.list.size();
+
+ if ( size > maxSize ) {
+ this.list.subList( maxSize, size ).clear();
+ }
+ }
+
+
+ /**
+ * Add all the elements to this list
+ */
+ public void addAll( final Collection<? extends ScanColumn> cols ) {
+ for ( ScanColumn col : cols ) {
+ add( col );
+ }
+ }
+
+
+ /**
+ * Returns a new list. If no elements are present, returns null
+ */
+ public Set<ScanColumn> asSet() {
+ if ( this.list.size() == 0 ) {
+ return null;
+ }
+
+ return new LinkedHashSet<ScanColumn>( this.list );
+ }
+
+
+ /**
+ * Mark our last element in the tree as the max
+ */
+ public void mark() {
+
+ final int size = this.list.size();
+
+ //we don't have any elements in the list, and we've never set a min
+ if ( size == 0 ) {
+ return;
+ }
+
+ min = this.list.get( size - 1 );
+ }
+
+
+ /**
+ * Clear the list
+ */
+ public void clear() {
+ this.list.clear();
+ }
++
++ public void reset(){
++ clear();
++ this.min = null;
++ }
+ }
+
+
+ /**
+ * Simple comparator for comparing scan columns. Orders them by time uuid
+ */
+ private static class ScanColumnComparator implements Comparator<ScanColumn> {
+
+ @Override
+ public int compare( final ScanColumn o1, final ScanColumn o2 ) {
+ if ( o1 == null ) {
+ if ( o2 == null ) {
+ return 0;
+ }
+
+ return -1;
+ }
+
+ else if ( o2 == null ) {
+ return 1;
+ }
+
+ return UUIDUtils.compare( o1.getUUID(), o2.getUUID() );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/test/java/org/apache/usergrid/persistence/query/AllInConnectionNoTypeIT.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/persistence/query/AllInConnectionNoTypeIT.java
index ecb74da,0000000..04bcb81
mode 100644,000000..100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/AllInConnectionNoTypeIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/AllInConnectionNoTypeIT.java
@@@ -1,55 -1,0 +1,56 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query;
+
+
+import org.apache.usergrid.CoreApplication;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.junit.Test;
+
+
+/** @author tnine */
+public class AllInConnectionNoTypeIT extends AbstractIteratingQueryIT {
++
+ @Test
+ public void allInConnectionNoType() throws Exception {
+ allIn( new ConnectionNoTypeHelper(app) );
+ }
+
+
+ class ConnectionNoTypeHelper extends ConnectionHelper {
+
+ public ConnectionNoTypeHelper( final CoreApplication app ) {
+ super( app );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.SingleOrderByMaxLimitCollection.ConnectionHelper#getResults
+ * (org.apache.usergrid.persistence.Query)
+ */
+ @Override
+ public Results getResults( Query query ) throws Exception {
+ query.setConnectionType( CONNECTION );
+ // don't set it on purpose
+ query.setEntityType( null );
+ return app.getEm().searchConnectedEntities( rootEntity, query );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
index dc69e8c,0000000..143e718
mode 100644,000000..100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
@@@ -1,163 -1,0 +1,172 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query;
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
++<<<<<<< HEAD:stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
++=======
++import org.usergrid.persistence.Entity;
++import org.usergrid.persistence.Query;
++import org.usergrid.persistence.Results;
++import org.usergrid.persistence.cassandra.QueryProcessor;
++>>>>>>> 33e00189bfe2b51b0795803fb8b258fe3841217d:stack/core/src/test/java/org/usergrid/persistence/query/IntersectionUnionPagingIT.java
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author tnine
+ */
+public class IntersectionUnionPagingIT extends AbstractIteratingQueryIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger( IntersectionUnionPagingIT.class );
+
+ private static final String unionScan =
+ "select * where (field1Or > '00000000' OR field2Or > '00000000') AND fieldDate = '0000-00-00'";
+ private static final String scanUnion =
+ "select * where fieldDate = '0000-00-00' AND (field1Or > '00000000' OR field2Or > '00000000') ";
- private static final int PAGE_SIZE = 100;
++
++ private static final int PAGE_SIZE = 300;
+
+
+ @Test
+ public void testUnionPagingCollection() throws Exception {
+
+
+ final CollectionIoHelper collectionIoHelper = new CollectionIoHelper( app );
+
+ Set<String> created = performSetup( collectionIoHelper );
+
+
- testUnionPaging( collectionIoHelper, unionScan, created );
++// testUnionPaging( collectionIoHelper, unionScan, created );
+ testUnionPaging( collectionIoHelper, scanUnion, created );
+ }
+
+
+ @Test
+ public void testUnionPagingConnection() throws Exception {
+
+ final ConnectionHelper connectionHelper = new ConnectionHelper( app );
+
+ Set<String> created = performSetup( connectionHelper );
+
+
+ testUnionPaging( connectionHelper, unionScan, created );
+ testUnionPaging( connectionHelper, scanUnion, created );
+ }
+
+
+ private Set<String> performSetup( final IoHelper io ) throws Exception {
+ io.doSetup();
+
- int size = 500;
++ int size = ( int ) ( QueryProcessor.PAGE_SIZE*2.5);
+
+ long start = System.currentTimeMillis();
+
+ LOG.info( "Writing {} entities.", size );
+
+ final String zeros = String.format( "%08d", 0 );
+
+ Set<String> names = new HashSet<String>( size );
+
+ for ( int i = 0; i < size; i++ ) {
+ Map<String, Object> entity = new HashMap<String, Object>();
+ final String name = String.valueOf( i );
+ entity.put( "name", name );
+ entity.put( "fieldDate", "0000-00-00" );
+
- String field1;
++ String field1 = String.format( "%08d", i + 1 );
+ String field2;
+
+ //use a value slightly smaller than page size, since we want to simulate
+ //the cursor issues with union queries
- if ( i < PAGE_SIZE - 10 ) {
- field1 = String.format( "%08d", i + 1 );
++
++ if ( i < size - 10 ) {
+ field2 = zeros;
+ }
+ else {
- field1 = zeros;
+ field2 = String.format( "%08d", i + 1 );
+ }
+
+ names.add( name );
+
+ entity.put( "field1Or", field1 );
+ entity.put( "field2Or", field2 );
+
- io.writeEntity( entity );
++ Entity saved = io.writeEntity( entity );
++
++ LOG.info("Writing entity with id '{}'", saved.getUuid());
+ }
+
+ long stop = System.currentTimeMillis();
+
+ LOG.info( "Writes took {} ms", stop - start );
+
+ return Collections.unmodifiableSet( names );
+ }
+
+
+ private void testUnionPaging( final IoHelper io, final String queryString, final Set<String> expectedResults )
+ throws Exception {
+
+
+ Set<String> newSets = new HashSet<String>( expectedResults );
+
+ //our field1Or has a result size < our page size, so it shouldn't blow up when the cursor is getting created
+ //the leaf iterator should insert it's own "no value left" into the cursor
+ Query query = Query.fromQL( queryString );
+ query.setLimit( PAGE_SIZE );
+
+ Results results;
+
+ long start = System.currentTimeMillis();
+
+ do {
+
+ // now do simple ordering, should be returned in order
+ results = io.getResults( query );
+
+ for ( int i = 0; i < results.size(); i++ ) {
+ final String name = results.getEntities().get( i ).getName();
+
+ assertTrue( "Value should not be returned twice", newSets.contains( name ) );
+
+ newSets.remove( name );
+ }
+
+ query.setCursor( results.getCursor() );
+ }
+ while ( results.getCursor() != null );
+
+ long stop = System.currentTimeMillis();
+
+ LOG.info( "Query took {} ms to return {} entities", stop - start, expectedResults.size() );
+
+ assertEquals( "All names returned", 0, newSets.size() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/test/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIteratorTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIteratorTest.java
index 2d8cc86,0000000..9da87ab
mode 100644,000000..100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIteratorTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ir/result/IntersectionIteratorTest.java
@@@ -1,306 -1,0 +1,308 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.query.ir.result;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.query.ir.result.IntersectionIterator;
+import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/** @author tnine */
+public class IntersectionIteratorTest {
+
+ @Test
+ public void mutipleIterators() {
+
+ UUID id1 = UUIDUtils.minTimeUUID( 1 );
+ UUID id2 = UUIDUtils.minTimeUUID( 2 );
+ UUID id3 = UUIDUtils.minTimeUUID( 3 );
+ UUID id4 = UUIDUtils.minTimeUUID( 4 );
+ UUID id5 = UUIDUtils.minTimeUUID( 5 );
+ UUID id6 = UUIDUtils.minTimeUUID( 6 );
+ UUID id7 = UUIDUtils.minTimeUUID( 7 );
+ UUID id8 = UUIDUtils.minTimeUUID( 8 );
+ UUID id9 = UUIDUtils.minTimeUUID( 9 );
+ UUID id10 = UUIDUtils.minTimeUUID( 10 );
+
+ // we should get intersection on 1, 3, and 8
+ InOrderIterator first = new InOrderIterator( 100 );
++ first.add( id9 );
++ first.add( id8 );
+ first.add( id1 );
+ first.add( id2 );
+ first.add( id3 );
- first.add( id8 );
- first.add( id9 );
++
++
+
+ InOrderIterator second = new InOrderIterator( 100 );
+ second.add( id1 );
+ second.add( id2 );
+ second.add( id3 );
+ second.add( id4 );
+ second.add( id8 );
+ second.add( id10 );
+
+ InOrderIterator third = new InOrderIterator( 100 );
+ third.add( id1 );
+ third.add( id3 );
+ third.add( id5 );
+ third.add( id6 );
+ third.add( id7 );
+ third.add( id8 );
+
+ InOrderIterator fourth = new InOrderIterator( 100 );
+ fourth.add( id1 );
+ fourth.add( id2 );
+ fourth.add( id3 );
+ fourth.add( id6 );
+ fourth.add( id8 );
+ fourth.add( id10 );
+
+ IntersectionIterator intersection = new IntersectionIterator( 100 );
+ intersection.addIterator( first );
+ intersection.addIterator( second );
+ intersection.addIterator( third );
+ intersection.addIterator( fourth );
+
+ Iterator<ScanColumn> union = intersection.next().iterator();
+
+ // now make sure it's right, only 1, 3 and 8 intersect
+ assertTrue( union.hasNext() );
- assertEquals( id1, union.next().getUUID() );
++ assertEquals( id8, union.next().getUUID() );
+
+ assertTrue( union.hasNext() );
- assertEquals( id3, union.next().getUUID() );
++ assertEquals( id1, union.next().getUUID() );
+
+ assertTrue( union.hasNext() );
- assertEquals( id8, union.next().getUUID() );
++ assertEquals( id3, union.next().getUUID() );
+
+ assertFalse( union.hasNext() );
+ }
+
+
+ @Test
+ public void oneIterator() {
+
+ UUID id1 = UUIDUtils.minTimeUUID( 1 );
+ UUID id2 = UUIDUtils.minTimeUUID( 2 );
+ UUID id3 = UUIDUtils.minTimeUUID( 3 );
+ UUID id4 = UUIDUtils.minTimeUUID( 4 );
+
+ // we should get intersection on 1, 3, and 8
+ InOrderIterator first = new InOrderIterator( 100 );
+ first.add( id1 );
+ first.add( id2 );
+ first.add( id3 );
+ first.add( id4 );
+
+ IntersectionIterator intersection = new IntersectionIterator( 100 );
+ intersection.addIterator( first );
+
+ // now make sure it's right, only 1, 3 and 8 intersect
+ assertTrue( intersection.hasNext() );
+
+ Set<ScanColumn> page = intersection.next();
+
+ Iterator<ScanColumn> union = page.iterator();
+
+ assertEquals( id1, union.next().getUUID() );
+
+ assertTrue( union.hasNext() );
+ assertEquals( id2, union.next().getUUID() );
+
+ assertTrue( union.hasNext() );
+ assertEquals( id3, union.next().getUUID() );
+
+ assertTrue( union.hasNext() );
+ assertEquals( id4, union.next().getUUID() );
+
+ assertFalse( union.hasNext() );
+ }
+
+
+ @Test
+ public void noIterator() {
+ IntersectionIterator union = new IntersectionIterator( 100 );
+
+ // now make sure it's right, only 1, 3 and 8 intersect
+ assertFalse( union.hasNext() );
+ }
+
+
+ @Test
+ public void largeIntersection() {
+
+ int size = 10000;
+ int firstIntersection = 100;
+ int secondIntersection = 200;
+
+ UUID[] firstSet = new UUID[size];
+ UUID[] secondSet = new UUID[size];
+ UUID[] thirdSet = new UUID[size];
+
+ InOrderIterator first = new InOrderIterator( 100 );
+ InOrderIterator second = new InOrderIterator( 100 );
+ InOrderIterator third = new InOrderIterator( 100 );
+
+ List<UUID> results = new ArrayList<UUID>( size / secondIntersection );
+
+ for ( int i = 0; i < size; i++ ) {
+ firstSet[i] = UUIDUtils.newTimeUUID();
+ // every 100 elements, set the element equal to the first set. This way we
+ // have intersection
+
+ if ( i % firstIntersection == 0 ) {
+ secondSet[i] = firstSet[i];
+ }
+ else {
+ secondSet[i] = UUIDUtils.newTimeUUID();
+ }
+
+ if ( i % secondIntersection == 0 ) {
+ thirdSet[i] = firstSet[i];
+ results.add( firstSet[i] );
+ }
+
+ else {
+ thirdSet[i] = UUIDUtils.newTimeUUID();
+ }
+ }
+
+ first.add( firstSet );
+
+ reverse( secondSet );
+ //reverse the second
+ second.add( secondSet );
+ third.add( thirdSet );
+
+ //now itersect them and make sure we get all results in a small set
+
+ int numPages = 2;
+ int pageSize = results.size() / numPages;
+
+ IntersectionIterator intersection = new IntersectionIterator( pageSize );
+ intersection.addIterator( first );
+ intersection.addIterator( second );
+ intersection.addIterator( third );
+
+ assertTrue( intersection.hasNext() );
+
+
+ Iterator<UUID> expected = results.iterator();
+ Set<ScanColumn> resultSet = intersection.next();
+ Iterator<ScanColumn> union = resultSet.iterator();
+
+
+ while ( union.hasNext() ) {
+ assertTrue( expected.hasNext() );
+ assertEquals( expected.next(), union.next().getUUID() );
+ }
+
+
+ //now get the 2nd page
+ resultSet = intersection.next();
+ union = resultSet.iterator();
+
+
+ while ( union.hasNext() ) {
+ assertTrue( expected.hasNext() );
+ assertEquals( expected.next(), union.next().getUUID() );
+ }
+
+ //no more elements
+ assertFalse( intersection.hasNext() );
+ assertFalse( expected.hasNext() );
+ }
+
+
+ /**
+ * Tests that when there are multiple iterators, and one in the "middle" of the list returns no results, it will
+ * short circuit since no results will be possible
+ */
+ @Test
+ public void mutipleIteratorsNoIntersection() {
+
+ UUID id1 = UUIDUtils.minTimeUUID( 1 );
+ UUID id2 = UUIDUtils.minTimeUUID( 2 );
+ UUID id3 = UUIDUtils.minTimeUUID( 3 );
+ UUID id4 = UUIDUtils.minTimeUUID( 4 );
+ UUID id6 = UUIDUtils.minTimeUUID( 6 );
+ UUID id8 = UUIDUtils.minTimeUUID( 8 );
+ UUID id9 = UUIDUtils.minTimeUUID( 9 );
+ UUID id10 = UUIDUtils.minTimeUUID( 10 );
+
+ // we should get intersection on 1, 3, and 8
+ InOrderIterator first = new InOrderIterator( 100 );
+ first.add( id1 );
+ first.add( id2 );
+ first.add( id3 );
+ first.add( id8 );
+ first.add( id9 );
+
+ InOrderIterator second = new InOrderIterator( 100 );
+ second.add( id1 );
+ second.add( id2 );
+ second.add( id3 );
+ second.add( id4 );
+ second.add( id8 );
+ second.add( id10 );
+
+ InOrderIterator third = new InOrderIterator( 100 );
+
+ InOrderIterator fourth = new InOrderIterator( 100 );
+ fourth.add( id1 );
+ fourth.add( id2 );
+ fourth.add( id3 );
+ fourth.add( id6 );
+ fourth.add( id8 );
+ fourth.add( id10 );
+
+ IntersectionIterator intersection = new IntersectionIterator( 100 );
+ intersection.addIterator( first );
+ intersection.addIterator( second );
+ intersection.addIterator( third );
+ intersection.addIterator( fourth );
+
+ Iterator<ScanColumn> union = intersection.next().iterator();
+
+ // now make sure it's right, only 1, 3 and 8 intersect
+ assertFalse( union.hasNext() );
+ }
+
+
+ private void reverse( UUID[] array ) {
+
+ UUID temp = null;
+
+ for ( int i = 0; i < array.length / 2; i++ ) {
+ temp = array[i];
+ array[i] = array[array.length - i - 1];
+ array[array.length - i - 1] = temp;
+ }
+ }
+}