You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/24 00:23:48 UTC
incubator-usergrid git commit: Refactor to clean up. Still a WIP
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-494 c4f654847 -> 61aa97945
Refactor to clean up. Still a WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/61aa9794
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/61aa9794
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/61aa9794
Branch: refs/heads/USERGRID-494
Commit: 61aa97945313533f6ef676f3f9dab5a22b9cb63b
Parents: c4f6548
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 23 09:52:44 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 23 17:23:46 2015 -0600
----------------------------------------------------------------------
.../corepersistence/io/read/Command.java | 8 +-
.../corepersistence/io/read/CommandBuilder.java | 8 +-
.../io/read/EntityIndexCommand.java | 384 +++++++++++++++++++
.../io/read/EntityIndexCommands.java | 119 ------
.../corepersistence/rx/impl/CollectUntil.java | 11 +-
.../java/org/apache/usergrid/TempExample.java | 217 ++++++++++-
.../rx/impl/CollectUntilTest.java | 6 +-
7 files changed, 605 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
index 4e3b0fc..34b8d1e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java
@@ -25,14 +25,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
-public interface Command<T> {
+public interface Command<T, R> extends Observable.Transformer<T, R> {
- /**
- * Process our input stream
- * @param input
- * @return
- */
- public Observable<T> process( Observable<T> input );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
index 828cbc9..bbf74b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java
@@ -38,7 +38,7 @@ public class CommandBuilder {
private CursorCache cache;
- private final Observable<Id> pathObservable;
+ private Observable<Id> pathObservable;
public CommandBuilder( final Id root ) {
@@ -46,6 +46,12 @@ public class CommandBuilder {
}
+
+ public void addCommand(final Command<Id, Id> intermediateCommand){
+ pathObservable = pathObservable.compose( intermediateCommand );
+ }
+
+
/**
* Set our cache
* @param cache
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
new file mode 100644
index 0000000..a8fa221
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.io.read;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Func0;
+import rx.functions.Func1;
+
+
+@Singleton
+public class EntityIndexCommand implements Command<Id, EntityIndexCommand.SearchResults> {
+
+ private final Id applicationId;
+ private final ApplicationScope applicationScope;
+ private final ApplicationEntityIndex index;
+ private final SearchTypes types;
+ private final String query;
+ private final int resultSetSize;
+ private final String scopeType;
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
+ @Inject
+ public EntityIndexCommand( final Id applicationId, final ApplicationScope applicationScope,
+ final ApplicationEntityIndex index, final SearchTypes types, final String query,
+ final int resultSetSize, final String scopeType,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+ this.applicationId = applicationId;
+ this.applicationScope = applicationScope;
+
+ this.index = index;
+ this.types = types;
+ this.query = query;
+ this.resultSetSize = resultSetSize;
+ this.scopeType = scopeType;
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ }
+
+
+ @Override
+ public Observable<EntityIndexCommand.SearchResults> call( final Observable<Id> idObservable ) {
+
+ //create our observable of candidate search results
+ final Observable<CandidateResults> candidateResults = idObservable
+ .flatMap( id -> Observable.create( new ElasticSearchObservable( initialSearch( id ), nextPage( id ) ) ) );
+
+ final Observable<CandidateResult> candidateObservable =
+ candidateResults.flatMap( candidates -> Observable.from( candidates ) );
+
+ //since we'll only have at most 100 results in memory at a time, we roll up our groups and emit them on to
+ // the collector
+ final Observable<CandidateGroup> candidateGroup =
+ candidateObservable.groupBy( candidate -> candidate.getId() ).map( observableGroup -> {
+
+
+ //for each group, create a list, then sort by highest version first
+ final List<CandidateResult> groupList = observableGroup.toList().toBlocking().last();
+
+ Collections.sort( groupList, CandidateVersionComparator::compare );
+
+ //create our candidate group and emit it
+ final CandidateGroup group =
+ new CandidateGroup( groupList.get( 0 ), groupList.subList( 1, groupList.size() ) );
+
+ return group;
+ } );
+
+
+ //buffer our candidate group up to our resultset size.
+ final Observable<CandidateCollector> collectedCandidates =
+ candidateGroup.buffer( resultSetSize ).flatMap( candidates -> {
+
+ final Observable<CandidateCollector> collector = Observable.from( candidates ).collect(
+ () -> new CandidateCollector( resultSetSize ), ( candidateCollector, candidate ) -> {
+ //add our candidates to our collector
+ candidateCollector.addCandidate( candidate.toKeep );
+ //add our empty results
+ candidateCollector.addEmptyResults( candidate.toRemove );
+ } );
+
+ return collector;
+ } );
+
+ //now we have our collected candidates, load them
+
+
+ final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities(resultSetSize) );
+
+
+ return loadedEntities;
+ }
+
+
+ /**
+ * Perform the initial search with the sourceId
+ */
+ private Func0<CandidateResults> initialSearch( final Id sourceId ) {
+ return () -> index.search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ) );
+ }
+
+
+ /**
+ * Search the next page for the specified source
+ */
+ private Func1<String, CandidateResults> nextPage( final Id sourceId ) {
+ return cursor -> index
+ .search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ).withCursor( cursor ) );
+ }
+
+
+ /**
+ * Function that will load our entities from our candidates, filter stale or missing candidates and return results
+ * @param expectedSize
+ * @return
+ */
+ private Func1<CandidateCollector, SearchResults> loadEntities(final int expectedSize) {
+ return candidateCollector -> {
+
+ //from our candidates, group them by id type so we can create scopes
+ Observable.from( candidateCollector.getCandidates() ).groupBy( candidate -> candidate.getId().getType() )
+ .flatMap( groups -> {
+
+
+ final List<CandidateResult> candidates = groups.toList().toBlocking().last();
+
+ //we can get no results, so quit aggregating if there are none
+ if ( candidates.size() == 0 ) {
+ return Observable.just( new SearchResults( 0 ) );
+ }
+
+
+ final String typeName = candidates.get( 0 ).getId().getType();
+
+ final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( typeName );
+
+
+ //create our scope to get our entity manager
+
+ final CollectionScope scope =
+ new CollectionScopeImpl( applicationId, applicationId, collectionType );
+
+ final EntityCollectionManager ecm =
+ entityCollectionManagerFactory.createCollectionManager( scope );
+
+
+ //get our entity ids
+ final List<Id> entityIds =
+ Observable.from( candidates ).map( c -> c.getId() ).toList().toBlocking().last();
+
+ //TODO, change this out
+
+ //an observable of our entity set
+
+
+
+ //now go through all our candidates and verify
+
+ return Observable.from( candidates ).collect( () -> new SearchResults( expectedSize ), (searchResults, candidate) ->{
+
+ final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last();
+
+ final MvccEntity entity = entitySet.getEntity( candidate.getId() );
+
+
+ //our candidate it stale, or target entity was deleted add it to the remove of our collector
+ if(UUIDComparator.staticCompare( entity.getVersion(), candidate.getVersion()) > 0 || !entity.getEntity().isPresent()){
+ searchResults.addToRemove( candidate );
+ return;
+ }
+
+
+ searchResults.addEntity( entity.getEntity().get() );
+
+
+ } )
+ //add the existing set to remove to this set
+ .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove() ) );
+
+ } );
+
+
+ return null;
+ };
+ }
+
+
+
+
+ /**
+ * Collects all valid results (in order) as well as candidates to be removed
+ */
+ public static class CandidateCollector {
+ private final List<CandidateResult> candidates;
+ private final List<CandidateResult> toRemove;
+
+
+ public CandidateCollector( final int maxSize ) {
+ candidates = new ArrayList<>( maxSize );
+ toRemove = new ArrayList<>( maxSize );
+ }
+
+
+ public void addCandidate( final CandidateResult candidate ) {
+ this.candidates.add( candidate );
+ }
+
+
+ public void addEmptyResults( final Collection<CandidateResult> stale ) {
+ this.toRemove.addAll( stale );
+ }
+
+
+ public List<CandidateResult> getCandidates() {
+ return candidates;
+ }
+
+
+ public List<CandidateResult> getToRemove() {
+ return toRemove;
+ }
+ }
+
+
+ public static class SearchResults {
+ private final List<Entity> entities;
+ private final List<CandidateResult> toRemove;
+
+ private String cursor;
+
+
+ public SearchResults( final int maxSize ) {
+ entities = new ArrayList<>( maxSize );
+ this.toRemove = new ArrayList<>( maxSize );
+ }
+
+
+ public void addEntity( final Entity entity ) {
+ this.entities.add( entity );
+ }
+
+
+ public void addToRemove( final Collection<CandidateResult> stale ) {
+ this.toRemove.addAll( stale );
+ }
+
+
+ public void addToRemove( final CandidateResult candidateResult ) {
+ this.toRemove.add( candidateResult );
+ }
+
+
+
+
+ public void setCursor( final String cursor ) {
+ this.cursor = cursor;
+ }
+ }
+
+
+ /**
+ * An observable that will perform a search and continually emit results while they exist.
+ */
+ public static class ElasticSearchObservable implements Observable.OnSubscribe<CandidateResults> {
+
+ private final Func1<String, CandidateResults> fetchNextPage;
+ private final Func0<CandidateResults> fetchInitialResults;
+
+
+ public ElasticSearchObservable( final Func0<CandidateResults> fetchInitialResults,
+ final Func1<String, CandidateResults> fetchNextPage ) {
+ this.fetchInitialResults = fetchInitialResults;
+ this.fetchNextPage = fetchNextPage;
+ }
+
+
+ @Override
+ public void call( final Subscriber<? super CandidateResults> subscriber ) {
+
+ subscriber.onStart();
+
+ try {
+ CandidateResults results = fetchInitialResults.call();
+
+
+ //emit our next page
+ while ( true ) {
+ subscriber.onNext( results );
+
+ //if we have no cursor, we're done
+ if ( !results.hasCursor() ) {
+ break;
+ }
+
+
+ //we have a cursor, get our results to emit for the next page
+ results = fetchNextPage.call( results.getCursor() );
+ }
+
+ subscriber.onCompleted();
+ }
+ catch ( Throwable t ) {
+ subscriber.onError( t );
+ }
+ }
+ }
+
+
+ /**
+ * A message that contains the candidate to keep, and the candidate toRemove
+ */
+ public static class CandidateGroup {
+ private final CandidateResult toKeep;
+ private final Collection<CandidateResult> toRemove;
+
+
+ public CandidateGroup( final CandidateResult toKeep, final Collection<CandidateResult> toRemove ) {
+ this.toKeep = toKeep;
+ this.toRemove = toRemove;
+ }
+ }
+
+
+ /**
+ * Compares 2 candidates by version. The max version is considered greater
+ */
+ private static final class CandidateVersionComparator {
+
+ public static int compare( final CandidateResult o1, final CandidateResult o2 ) {
+ return UUIDComparator.staticCompare( o1.getVersion(), o2.getVersion() );
+ }
+ }
+
+ /***********************
+ * FROM HERE DOWN IS EXPERIMENTAL
+ *************************
+ */
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
deleted file mode 100644
index d6f4e93..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.io.read;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Action2;
-import rx.functions.Func1;
-
-
-public class EntityIndexCommands {
-
-
-// /**
-// * Perform a search of all the entities, and then return the observable of search results
-// * @param index
-// * @param edgeType
-// * @param types
-// * @param query
-// * @return
-// */
-// public static Func1<Id, SearchResults> searchEntities(final ApplicationEntityIndex index,final String edgeType, final SearchTypes types, final String query ){
-//
-// return nodeId -> {
-//
-// }
-// }
- /**
- * Construct an indexScope from the input id type
- * @param type
- * @return
- */
- public static Func1<Id, IndexScope> createSearchScope(final String type){
- return id -> new IndexScopeImpl( id, type );
- }
- /**
- * Get our candidate results
- * @param index
- * @param types The types to return
- * @param query
- * @return
- */
- public static Func1<IndexScope, CandidateResults> getCandidates(final ApplicationEntityIndex index, final SearchTypes types, final String query){
- return indexScope -> index.search( indexScope, types, Query.fromQLNullSafe( query ) );
- }
-
-
- /**
- * Flattens candidate results into a single stream of a result
- * @return
- */
- public static Func1<CandidateResults, Observable<CandidateResult>> flattenCandidates(){
- return (CandidateResults candidateResults) -> Observable.from( candidateResults );
- }
-
-
- public static Action2<SearchResults, EntitySet> collectSet(){
- return (searchResults, entitySet) -> {
- searchResults.addEntities( entitySet.entities );
- };
- }
-
-
-
- public static class SearchResults{
- private final List<Entity> entities;
- private String cursor;
-
-
- public SearchResults(final int maxSize) {entities = new ArrayList<>(maxSize);}
-
- public void addEntities(final Collection<Entity> entities){
- this.entities.addAll( entities );
-
- }
- public void setCursor(final String cursor){
- this.cursor = cursor;
- }
- }
-
- public static class EntitySet{
- private final List<Entity> entities;
-
-
- public EntitySet( final List<Entity> entities ) {this.entities = entities;}
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
index a2cb754..8a0c014 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
@@ -29,20 +29,19 @@ import rx.internal.operators.OperatorScan;
/**
- * An operation for performing a collect until the predicate returns true
+ * An operation for performing a collect until the shortCircuitWhen returns true
*/
public class CollectUntil<T, R> implements Observable.Transformer<T, R> {
final Func0<R> stateFactory;
final Action2<R, ? super T> collector;
- final Func1<R, Boolean> predicate;
+ final Func1<R, Boolean> shortCircuitWhen;
- public CollectUntil( final Func0<R> stateFactory, final Action2<R, ? super T> collector,
- final Func1<R, Boolean> predicate ) {
+ public CollectUntil( final Func1<R, Boolean> shortCircuitWhen, final Func0<R> stateFactory, final Action2<R, ? super T> collector) {
this.stateFactory = stateFactory;
this.collector = collector;
- this.predicate = predicate;
+ this.shortCircuitWhen = shortCircuitWhen;
}
@@ -54,7 +53,7 @@ public class CollectUntil<T, R> implements Observable.Transformer<T, R> {
};
- return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( predicate );
+ return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( shortCircuitWhen );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/test/java/org/apache/usergrid/TempExample.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TempExample.java b/stack/core/src/test/java/org/apache/usergrid/TempExample.java
index 1db450d..ed58111 100644
--- a/stack/core/src/test/java/org/apache/usergrid/TempExample.java
+++ b/stack/core/src/test/java/org/apache/usergrid/TempExample.java
@@ -20,16 +20,37 @@
package org.apache.usergrid;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.usergrid.corepersistence.rx.impl.CollectUntil;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.fasterxml.uuid.UUIDComparator;
+
import rx.Observable;
+import rx.Statement;
+import rx.Subscriber;
+import rx.functions.Action2;
+import rx.functions.Func0;
+import rx.functions.Func1;
+import rx.observables.GroupedObservable;
import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.createSearchScope;
import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.getCandidates;
import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
public class TempExample {
@@ -37,38 +58,210 @@ public class TempExample {
//set our root observable
- public static void main(String[] args) {
- final Id rootId = createId( "thing" );
+//
+// final Id rootId = createId( "thing" );
+//
+// final ApplicationEntityIndex index = null;
+//
+//
+// final SearchTypes searchType = SearchTypes.fromTypes( "test" );
+//
+// final String query = "select * ";
+//
+// final int selectSize = 100;
+//
+// final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) )
+// .map( getCandidates( index, searchType,
+// query ) ); //.compose();
+//
+//
+// final Observable<GroupedObservable<Id, CandidateResult>> next =
+// observable.flatMap( candidates -> Observable.from( candidates ) ).groupBy( result -> result.getId() );
+//
+//
+// final Observable<ResultsCollector> results = next.compose( new CollectUntil<>(
+// //stop when the collector has a full set
+// collector -> collector.hasFullSet(),
+// //create a new results collector, based on requested size
+// () -> new ResultsCollector( selectSize ),
+// //collect each candidate into our set of Candidate Results
+// ( collector, group ) -> {
+//
+// final List<CandidateResult> list = group.toList().toBlocking().last();
+//
+// collector.addCandidates( list );
+// } ) );
+//
+
+ //now reduce these results again via a cassandra collector
+
+
+
+ @Test
+ public void testLoops(){
+
+ final Observable<Integer> observable = Observable.create( new Observable.OnSubscribe<Integer>() {
+
+ int value = 0;
+
+
+ @Override
+ public void call( final Subscriber<? super Integer> subscriber ) {
+ subscriber.onNext( ++value );
+ }
+ } );
+
+
+
+
+ final CounterCollector collector = new CounterCollector();
+
+ final Action2<CounterCollector, Integer> collectorFunction = (col, value) -> {col.set( value ); };
+
+
- final ApplicationEntityIndex index = null;
+ final Func0<Boolean> complete = () -> collector.isFull();
- final SearchTypes searchType = SearchTypes.fromTypes( "test" );
+ final Observable<CounterCollector> collectorObservable = Statement.doWhile( observable, complete ).collect( () -> collector, collectorFunction );
- final String query = "select * ";
- final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) ).map(getCandidates(index, searchType, query));
+ final int value = collectorObservable.toBlocking().last().lastValue;
- observable.doOnNext( a -> System.out.println( a) ).toBlocking().last();
+ assertEquals(100, value);
+
+
+
+
+
+ }
+
+
+ private static class CounterCollector{
+ private int lastValue;
+
+ public void set(final int newValue){
+ lastValue = newValue;
+ }
+
+ public boolean isFull(){
+ return lastValue >= 100;
+ }
}
- private static final class ResultsCollector{
+ private static final class ResultsCollector {
+
+ private Map<Id, IndexResult<CandidateResult>> toKeep = new HashMap<>();
+ private Set<CandidateResult> toRemove = new HashSet<>();
+
+ private final int resultSetSize;
+
+
+ private int currentIndex;
+
+
+ private ResultsCollector( final int resultSetSize ) {this.resultSetSize = resultSetSize;}
+
/**
* Add the candidates to our collection
- * @param results
*/
- public void addCandidates(final CandidateResults results ){
+ public void addCandidates( final List<CandidateResult> results ) {
+
+ Collections.sort( results, CandidateVersionComparator::compare );
+
+ final CandidateResult maxCandidate = results.get( 0 );
+
+ //add everything we don't use to our toRemoveSet
+ for ( int i = 1; i < results.size(); i++ ) {
+ toRemove.add( results.get( i ) );
+ }
+
+ //we have this id already, remove it and pick the max
+ final Id maxId = maxCandidate.getId();
+
+ //see if it exists in our set to keep
+ final IndexResult<CandidateResult> existingCandidate = toKeep.get( maxId );
+
+ if ( existingCandidate != null ) {
+
+ final CandidateResult existing = existingCandidate.value;
+
+ //our new value is greater than our existing, replace it
+ if ( CandidateVersionComparator.compare( maxCandidate, existing ) > 0 ) {
+ //add it to the keep
+ toKeep.put( maxId, new IndexResult( currentIndex, maxCandidate ) );
- //TODO, collect the results, removing groups
- Observable.from( results ).groupBy( candidate -> candidate.getId() ).collect( )
+ //remove the old value
+ toRemove.add( existingCandidate.value );
+ }
+
+ //what's in the map is already the max, add our candidate to the cleanup list
+ else {
+ toRemove.add( maxCandidate );
+ }
+ }
+
+ //add it to our list of items to keep
+ else {
+ toKeep.put( maxId, new IndexResult( currentIndex, maxCandidate ) );
+ }
+
+
+ //increment our index for the next invocation
+ currentIndex++;
+ }
+
+
+ /**
+ * We have a full set to evaluate for candidates
+ */
+ public boolean hasFullSet() {
+ return resultSetSize >= toKeep.size();
}
+
+ /**
+ * Get all the canidates we've collected
+ */
+ public Collection<IndexResult<CandidateResult>> getCandidates() {
+ return toKeep.values();
+ }
+
+ public Collection<CandidateResult> getStaleCandidates(){ return toRemove;}
}
+
+ /**
+ * Compares 2 candidates by version. The max version is considered greater
+ */
+ private static final class CandidateVersionComparator {
+
+ public static int compare( final CandidateResult o1, final CandidateResult o2 ) {
+ return UUIDComparator.staticCompare( o1.getVersion(), o2.getVersion() );
+ }
+ }
+
+
+ /**
+ * A data message with an index and a result. Can be used in aggregations
+ * @param <T>
+ */
+ private static final class IndexResult<T>{
+ private final int index;
+
+ private final T value;
+
+
+ private IndexResult( final int index, final T value ) {
+ this.index = index;
+ this.value = value;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
index ce12429..b6ebcaa 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java
@@ -33,10 +33,10 @@ public class CollectUntilTest {
public void testCollectUntil() {
final CollectUntil<Integer, CountCollector> collectUntil =
- new CollectUntil<>(
+ new CollectUntil<>( collector -> collector.isFull() ,
() -> new CountCollector(),
- ( collector, value ) -> collector.mark(),
- collector -> collector.isFull() );
+ ( collector, value ) -> collector.mark()
+ );
final CountCollector collector = Observable.range( 0, 200 ).compose( collectUntil ).toBlocking().last();