You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/24 00:23:48 UTC

incubator-usergrid git commit: Refactor to clean up. Still a WIP

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-494 c4f654847 -> 61aa97945


Refactor to clean up.  Still a WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/61aa9794
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/61aa9794
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/61aa9794

Branch: refs/heads/USERGRID-494
Commit: 61aa97945313533f6ef676f3f9dab5a22b9cb63b
Parents: c4f6548
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 23 09:52:44 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 23 17:23:46 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/io/read/Command.java        |   8 +-
 .../corepersistence/io/read/CommandBuilder.java |   8 +-
 .../io/read/EntityIndexCommand.java             | 384 +++++++++++++++++++
 .../io/read/EntityIndexCommands.java            | 119 ------
 .../corepersistence/rx/impl/CollectUntil.java   |  11 +-
 .../java/org/apache/usergrid/TempExample.java   | 217 ++++++++++-
 .../rx/impl/CollectUntilTest.java               |   6 +-
 7 files changed, 605 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
index 4e3b0fc..34b8d1e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
@@ -25,14 +25,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
-public interface Command<T> {
+public interface Command<T, R> extends Observable.Transformer<T, R> {
 
 
-    /**
-     * Process our input stream
-     * @param input
-     * @return
-     */
-    public Observable<T> process( Observable<T> input );
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
index 828cbc9..bbf74b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
@@ -38,7 +38,7 @@ public class CommandBuilder {
 
 
     private CursorCache cache;
-    private final Observable<Id> pathObservable;
+    private Observable<Id> pathObservable;
 
 
     public CommandBuilder( final Id root ) {
@@ -46,6 +46,12 @@ public class CommandBuilder {
     }
 
 
+
+    public void addCommand(final Command<Id, Id> intermediateCommand){
+       pathObservable =  pathObservable.compose( intermediateCommand );
+    }
+
+
     /**
      * Set our cache
      * @param cache

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
new file mode 100644
index 0000000..a8fa221
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.io.read;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Func0;
+import rx.functions.Func1;
+
+
+@Singleton
+public class EntityIndexCommand implements Command<Id, EntityIndexCommand.SearchResults> {
+
+    private final Id applicationId;
+    private final ApplicationScope applicationScope;
+    private final ApplicationEntityIndex index;
+    private final SearchTypes types;
+    private final String query;
+    private final int resultSetSize;
+    private final String scopeType;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+    @Inject
+    public EntityIndexCommand( final Id applicationId, final ApplicationScope applicationScope,
+                               final ApplicationEntityIndex index, final SearchTypes types, final String query,
+                               final int resultSetSize, final String scopeType,
+                               final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.applicationId = applicationId;
+        this.applicationScope = applicationScope;
+
+        this.index = index;
+        this.types = types;
+        this.query = query;
+        this.resultSetSize = resultSetSize;
+        this.scopeType = scopeType;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public Observable<EntityIndexCommand.SearchResults> call( final Observable<Id> idObservable ) {
+
+        //create our observable of candidate search results
+        final Observable<CandidateResults> candidateResults = idObservable
+            .flatMap( id -> Observable.create( new ElasticSearchObservable( initialSearch( id ), nextPage( id ) ) ) );
+
+        final Observable<CandidateResult> candidateObservable =
+            candidateResults.flatMap( candidates -> Observable.from( candidates ) );
+
+        //since we'll only have at most 100 results in memory at a time, we roll up our groups and emit them on to
+        // the collector
+        final Observable<CandidateGroup> candidateGroup =
+            candidateObservable.groupBy( candidate -> candidate.getId() ).map( observableGroup -> {
+
+
+                //for each group, create a list, then sort by highest version first
+                final List<CandidateResult> groupList = observableGroup.toList().toBlocking().last();
+
+                Collections.sort( groupList, CandidateVersionComparator::compare );
+
+                //create our candidate group and emit it
+                final CandidateGroup group =
+                    new CandidateGroup( groupList.get( 0 ), groupList.subList( 1, groupList.size() ) );
+
+                return group;
+            } );
+
+
+        //buffer our candidate group up to our resultset size.
+        final Observable<CandidateCollector> collectedCandidates =
+            candidateGroup.buffer( resultSetSize ).flatMap( candidates -> {
+
+                final Observable<CandidateCollector> collector = Observable.from( candidates ).collect(
+                    () -> new CandidateCollector( resultSetSize ), ( candidateCollector, candidate ) -> {
+                        //add our candidates to our collector
+                        candidateCollector.addCandidate( candidate.toKeep );
+                        //add our empty results
+                        candidateCollector.addEmptyResults( candidate.toRemove );
+                    } );
+
+                return collector;
+            } );
+
+        //now we have our collected candidates, load them
+
+
+        final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities(resultSetSize) );
+
+
+        return loadedEntities;
+    }
+
+
+    /**
+     * Perform the initial search with the sourceId
+     */
+    private Func0<CandidateResults> initialSearch( final Id sourceId ) {
+        return () -> index.search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ) );
+    }
+
+
+    /**
+     * Search the next page for the specified source
+     */
+    private Func1<String, CandidateResults> nextPage( final Id sourceId ) {
+        return cursor -> index
+            .search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ).withCursor( cursor ) );
+    }
+
+
+    /**
+     * Function that will load our entities from our candidates, filter stale or missing candidates and return results
+     * @param expectedSize
+     * @return
+     */
+    private Func1<CandidateCollector, SearchResults> loadEntities(final int expectedSize) {
+        return candidateCollector -> {
+
+            //from our candidates, group them by id type so we can create scopes
+            Observable.from( candidateCollector.getCandidates() ).groupBy( candidate -> candidate.getId().getType() )
+                      .flatMap( groups -> {
+
+
+                          final List<CandidateResult> candidates = groups.toList().toBlocking().last();
+
+                          //we can get no results, so quit aggregating if there are none
+                          if ( candidates.size() == 0 ) {
+                              return Observable.just( new SearchResults( 0 ) );
+                          }
+
+
+                          final String typeName = candidates.get( 0 ).getId().getType();
+
+                          final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( typeName );
+
+
+                          //create our scope to get our entity manager
+
+                          final CollectionScope scope =
+                              new CollectionScopeImpl( applicationId, applicationId, collectionType );
+
+                          final EntityCollectionManager ecm =
+                              entityCollectionManagerFactory.createCollectionManager( scope );
+
+
+                          //get our entity ids
+                          final List<Id> entityIds =
+                              Observable.from( candidates ).map( c -> c.getId() ).toList().toBlocking().last();
+
+                          //TODO, change this out
+
+                          //an observable of our entity set
+
+
+
+                          //now go through all our candidates and verify
+
+                          return Observable.from( candidates ).collect(  () -> new SearchResults( expectedSize ), (searchResults, candidate) ->{
+
+                              final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last();
+
+                              final MvccEntity entity = entitySet.getEntity( candidate.getId() );
+
+
+                              //our candidate it stale, or target entity was deleted add it to the remove of our collector
+                              if(UUIDComparator.staticCompare( entity.getVersion(), candidate.getVersion()) > 0 || !entity.getEntity().isPresent()){
+                                  searchResults.addToRemove( candidate );
+                                  return;
+                              }
+
+
+                              searchResults.addEntity( entity.getEntity().get() );
+
+
+                          } )
+                              //add the existing set to remove to this set
+                              .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove() ) );
+
+                      } );
+
+
+            return null;
+        };
+    }
+
+
+
+
+    /**
+     * Collects all valid results (in order) as well as candidates to be removed
+     */
+    public static class CandidateCollector {
+        private final List<CandidateResult> candidates;
+        private final List<CandidateResult> toRemove;
+
+
+        public CandidateCollector( final int maxSize ) {
+            candidates = new ArrayList<>( maxSize );
+            toRemove = new ArrayList<>( maxSize );
+        }
+
+
+        public void addCandidate( final CandidateResult candidate ) {
+            this.candidates.add( candidate );
+        }
+
+
+        public void addEmptyResults( final Collection<CandidateResult> stale ) {
+            this.toRemove.addAll( stale );
+        }
+
+
+        public List<CandidateResult> getCandidates() {
+            return candidates;
+        }
+
+
+        public List<CandidateResult> getToRemove() {
+            return toRemove;
+        }
+    }
+
+
+    public static class SearchResults {
+        private final List<Entity> entities;
+        private final List<CandidateResult> toRemove;
+
+        private String cursor;
+
+
+        public SearchResults( final int maxSize ) {
+            entities = new ArrayList<>( maxSize );
+            this.toRemove = new ArrayList<>( maxSize );
+        }
+
+
+        public void addEntity( final Entity entity ) {
+            this.entities.add( entity );
+        }
+
+
+        public void addToRemove( final Collection<CandidateResult> stale ) {
+            this.toRemove.addAll( stale );
+        }
+
+
+        public void addToRemove( final CandidateResult candidateResult ) {
+                   this.toRemove.add( candidateResult );
+               }
+
+
+
+
+        public void setCursor( final String cursor ) {
+            this.cursor = cursor;
+        }
+    }
+
+
+    /**
+     * An observable that will perform a search and continually emit results while they exist.
+     */
+    public static class ElasticSearchObservable implements Observable.OnSubscribe<CandidateResults> {
+
+        private final Func1<String, CandidateResults> fetchNextPage;
+        private final Func0<CandidateResults> fetchInitialResults;
+
+
+        public ElasticSearchObservable( final Func0<CandidateResults> fetchInitialResults,
+                                        final Func1<String, CandidateResults> fetchNextPage ) {
+            this.fetchInitialResults = fetchInitialResults;
+            this.fetchNextPage = fetchNextPage;
+        }
+
+
+        @Override
+        public void call( final Subscriber<? super CandidateResults> subscriber ) {
+
+            subscriber.onStart();
+
+            try {
+                CandidateResults results = fetchInitialResults.call();
+
+
+                //emit our next page
+                while ( true ) {
+                    subscriber.onNext( results );
+
+                    //if we have no cursor, we're done
+                    if ( !results.hasCursor() ) {
+                        break;
+                    }
+
+
+                    //we have a cursor, get our results to emit for the next page
+                    results = fetchNextPage.call( results.getCursor() );
+                }
+
+                subscriber.onCompleted();
+            }
+            catch ( Throwable t ) {
+                subscriber.onError( t );
+            }
+        }
+    }
+
+
+    /**
+     * A message that contains the candidate to keep, and the candidate toRemove
+     */
+    public static class CandidateGroup {
+        private final CandidateResult toKeep;
+        private final Collection<CandidateResult> toRemove;
+
+
+        public CandidateGroup( final CandidateResult toKeep, final Collection<CandidateResult> toRemove ) {
+            this.toKeep = toKeep;
+            this.toRemove = toRemove;
+        }
+    }
+
+
+    /**
+     * Compares 2 candidates by version.  The max version is considered greater
+     */
+    private static final class CandidateVersionComparator {
+
+        public static int compare( final CandidateResult o1, final CandidateResult o2 ) {
+            return UUIDComparator.staticCompare( o1.getVersion(), o2.getVersion() );
+        }
+    }
+
+    /***********************
+     * FROM HERE DOWN IS EXPERIMENTAL
+     *************************
+     */
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
deleted file mode 100644
index d6f4e93..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.io.read;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Action2;
-import rx.functions.Func1;
-
-
-public class EntityIndexCommands {
-
-
-//    /**
-//     * Perform a search of all the entities, and then return the observable of search results
-//     * @param index
-//     * @param edgeType
-//     * @param types
-//     * @param query
-//     * @return
-//     */
-//    public static Func1<Id, SearchResults> searchEntities(final ApplicationEntityIndex index,final String edgeType, final SearchTypes types, final String query  ){
-//
-//        return nodeId -> {
-//
-//        }
-//    }
-    /**
-     * Construct an indexScope from the input id type
-     * @param type
-     * @return
-     */
-   public static Func1<Id, IndexScope> createSearchScope(final String type){
-       return id -> new IndexScopeImpl( id, type );
-   }
-    /**
-     * Get our candidate results
-     * @param index
-     * @param types The types to return
-     * @param query
-     * @return
-     */
-    public static Func1<IndexScope, CandidateResults> getCandidates(final ApplicationEntityIndex index, final SearchTypes types, final String query){
-        return indexScope -> index.search( indexScope, types, Query.fromQLNullSafe( query ) );
-    }
-
-
-    /**
-     * Flattens candidate results into a single stream of a result
-     * @return
-     */
-    public static Func1<CandidateResults, Observable<CandidateResult>> flattenCandidates(){
-        return (CandidateResults candidateResults) -> Observable.from( candidateResults );
-    }
-
-
-    public static Action2<SearchResults, EntitySet> collectSet(){
-        return (searchResults, entitySet) -> {
-          searchResults.addEntities( entitySet.entities );
-        };
-    }
-
-
-
-    public static class SearchResults{
-        private final List<Entity> entities;
-        private String cursor;
-
-
-        public SearchResults(final int maxSize) {entities = new ArrayList<>(maxSize);}
-
-        public void addEntities(final Collection<Entity> entities){
-            this.entities.addAll( entities );
-
-        }
-        public void setCursor(final String cursor){
-            this.cursor = cursor;
-        }
-    }
-
-    public static class EntitySet{
-        private final List<Entity> entities;
-
-
-        public EntitySet( final List<Entity> entities ) {this.entities = entities;}
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
index a2cb754..8a0c014 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
@@ -29,20 +29,19 @@ import rx.internal.operators.OperatorScan;
 
 
 /**
- * An operation for performing a collect until the predicate returns true
+ * An operation for performing a collect until the shortCircuitWhen returns true
  */
 public class CollectUntil<T, R> implements Observable.Transformer<T, R> {
 
     final Func0<R> stateFactory;
     final Action2<R, ? super T> collector;
-    final Func1<R, Boolean> predicate;
+    final Func1<R, Boolean> shortCircuitWhen;
 
 
-    public CollectUntil( final Func0<R> stateFactory, final Action2<R, ? super T> collector,
-                          final Func1<R, Boolean> predicate ) {
+    public CollectUntil( final Func1<R, Boolean> shortCircuitWhen,  final Func0<R> stateFactory, final Action2<R, ? super T> collector) {
         this.stateFactory = stateFactory;
         this.collector = collector;
-        this.predicate = predicate;
+        this.shortCircuitWhen = shortCircuitWhen;
     }
 
 
@@ -54,7 +53,7 @@ public class CollectUntil<T, R> implements Observable.Transformer<T, R> {
         };
 
 
-        return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( predicate );
+        return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( shortCircuitWhen );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/test/java/org/apache/usergrid/TempExample.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TempExample.java b/stack/core/src/test/java/org/apache/usergrid/TempExample.java
index 1db450d..ed58111 100644
--- a/stack/core/src/test/java/org/apache/usergrid/TempExample.java
+++ b/stack/core/src/test/java/org/apache/usergrid/TempExample.java
@@ -20,16 +20,37 @@
 package org.apache.usergrid;
 
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.usergrid.corepersistence.rx.impl.CollectUntil;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.fasterxml.uuid.UUIDComparator;
+
 import rx.Observable;
+import rx.Statement;
+import rx.Subscriber;
+import rx.functions.Action2;
+import rx.functions.Func0;
+import rx.functions.Func1;
+import rx.observables.GroupedObservable;
 
 import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.createSearchScope;
 import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.getCandidates;
 import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
 
 
 public class TempExample {
@@ -37,38 +58,210 @@ public class TempExample {
     //set our root observable
 
 
-    public static void main(String[] args) {
 
-        final Id rootId = createId( "thing" );
+//
+//        final Id rootId = createId( "thing" );
+//
+//        final ApplicationEntityIndex index = null;
+//
+//
+//        final SearchTypes searchType = SearchTypes.fromTypes( "test" );
+//
+//        final String query = "select * ";
+//
+//        final int selectSize = 100;
+//
+//        final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) )
+//                                                                  .map( getCandidates( index, searchType,
+//                                                                      query ) ); //.compose();
+//
+//
+//        final Observable<GroupedObservable<Id, CandidateResult>> next =
+//            observable.flatMap( candidates -> Observable.from( candidates ) ).groupBy( result -> result.getId() );
+//
+//
+//        final Observable<ResultsCollector> results = next.compose(  new CollectUntil<>(
+//            //stop when the collector has a full set
+//                    collector -> collector.hasFullSet(),
+//            //create a new results collector, based on requested size
+//            () -> new ResultsCollector( selectSize ),
+//            //collect each candidate into our set of Candidate Results
+//            ( collector, group ) -> {
+//
+//                    final List<CandidateResult> list = group.toList().toBlocking().last();
+//
+//                    collector.addCandidates( list );
+//                } ) );
+//
+
+        //now reduce these results again via a cassandra collector
+
+
+
+    @Test
+    public void testLoops(){
+
+       final Observable<Integer> observable =  Observable.create( new Observable.OnSubscribe<Integer>() {
+
+            int value = 0;
+
+
+            @Override
+            public void call( final Subscriber<? super Integer> subscriber ) {
+                subscriber.onNext( ++value );
+            }
+        } );
+
+
+
+
+        final CounterCollector collector = new CounterCollector();
+
+        final Action2<CounterCollector, Integer> collectorFunction  = (col, value) -> {col.set( value ); };
+
+
 
-        final ApplicationEntityIndex index = null;
+        final Func0<Boolean> complete =  () -> collector.isFull();
 
 
-        final SearchTypes searchType = SearchTypes.fromTypes( "test" );
+        final Observable<CounterCollector> collectorObservable = Statement.doWhile( observable,  complete ).collect( () -> collector, collectorFunction  );
 
-        final String query = "select * ";
 
-        final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) ).map(getCandidates(index, searchType, query));
+        final int value = collectorObservable.toBlocking().last().lastValue;
 
 
-        observable.doOnNext( a -> System.out.println( a) ).toBlocking().last();
+        assertEquals(100, value);
+
+
+
+
+
+    }
+
+
+    private static class CounterCollector{
+        private int lastValue;
+
+        public void set(final int newValue){
+            lastValue = newValue;
+        }
+
+        public boolean isFull(){
+            return lastValue >= 100;
+        }
     }
 
 
-    private static final class ResultsCollector{
+    private static final class ResultsCollector {
+
+        private Map<Id, IndexResult<CandidateResult>> toKeep = new HashMap<>();
+        private Set<CandidateResult> toRemove = new HashSet<>();
+
+        private final int resultSetSize;
+
+
+        private int currentIndex;
+
+
+        private ResultsCollector( final int resultSetSize ) {this.resultSetSize = resultSetSize;}
+
 
         /**
          * Add the candidates to our collection
-         * @param results
          */
-        public void addCandidates(final CandidateResults results ){
+        public void addCandidates( final List<CandidateResult> results ) {
+
+            Collections.sort( results, CandidateVersionComparator::compare );
+
+            final CandidateResult maxCandidate = results.get( 0 );
+
+            //add everything we don't use to our toRemoveSet
+            for ( int i = 1; i < results.size(); i++ ) {
+                toRemove.add( results.get( i ) );
+            }
+
+            //we have this id already, remove it and pick the max
+            final Id maxId = maxCandidate.getId();
+
+            //see if it exists in our set to keep
+            final IndexResult<CandidateResult> existingCandidate = toKeep.get( maxId );
+
+            if ( existingCandidate != null ) {
+
+                final CandidateResult existing = existingCandidate.value;
+
+                //our new value is greater than our existing, replace it
+                if ( CandidateVersionComparator.compare( maxCandidate, existing ) > 0 ) {
+                    //add it to the keep
+                    toKeep.put( maxId, new IndexResult( currentIndex, maxCandidate ) );
 
-            //TODO, collect the results, removing groups
-            Observable.from( results ).groupBy( candidate -> candidate.getId() ).collect(  )
+                    //remove the old value
+                    toRemove.add( existingCandidate.value );
+                }
+
+                //what's in the map is already the max, add our candidate to the cleanup list
+                else {
+                    toRemove.add( maxCandidate );
+                }
+            }
+
+            //add it to our list of items to keep
+            else {
+                toKeep.put( maxId, new IndexResult( currentIndex, maxCandidate ) );
+            }
+
+
+            //increment our index for the next invocation
+            currentIndex++;
+        }
+
+
+        /**
+         * We have a full set to evaluate for candidates
+         */
+        public boolean hasFullSet() {
+            return resultSetSize >= toKeep.size();
         }
 
+
+        /**
+         * Get all the canidates we've collected
+         */
+        public Collection<IndexResult<CandidateResult>> getCandidates() {
+            return toKeep.values();
+        }
+
+        public Collection<CandidateResult> getStaleCandidates(){ return toRemove;}
     }
 
 
 
+
+    /**
+     * Compares 2 candidates by version.  The max version is considered greater
+     */
+    private static final class CandidateVersionComparator {
+
+        public static int compare( final CandidateResult o1, final CandidateResult o2 ) {
+            return UUIDComparator.staticCompare( o1.getVersion(), o2.getVersion() );
+        }
+    }
+
+
+    /**
+     * A data message with an index and a result.  Can be used in aggregations
+     * @param <T>
+     */
+    private static final class IndexResult<T>{
+        private final int index;
+
+        private final T value;
+
+
+        private IndexResult( final int index, final T value ) {
+            this.index = index;
+            this.value = value;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
index ce12429..b6ebcaa 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
@@ -33,10 +33,10 @@ public class CollectUntilTest {
     public void testCollectUntil() {
 
         final CollectUntil<Integer, CountCollector> collectUntil =
-            new CollectUntil<>(
+            new CollectUntil<>(  collector -> collector.isFull() ,
                 () -> new CountCollector(),
-                ( collector, value ) -> collector.mark(),
-                collector -> collector.isFull() );
+                ( collector, value ) -> collector.mark()
+                );
 
 
         final CountCollector collector = Observable.range( 0, 200 ).compose( collectUntil ).toBlocking().last();