You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/09 01:29:17 UTC
incubator-usergrid git commit: add scheduler to search observables
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 28bc4ca8e -> 7a72c5b19
add scheduler to search observables
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a72c5b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a72c5b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a72c5b1
Branch: refs/heads/two-dot-o-dev
Commit: 7a72c5b19ded2ba19f27b8370971a78f8b229fc0
Parents: 28bc4ca
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Jul 8 17:29:01 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Jul 8 17:29:01 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/pipeline/Pipeline.java | 4 +++-
.../pipeline/read/collect/ResultsPageCollector.java | 16 +++++++++++++---
.../results/ObservableQueryExecutor.java | 6 ++++--
3 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index dc95178..81f857f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -30,6 +30,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import rx.Observable;
+import rx.schedulers.Schedulers;
/**
@@ -75,7 +76,8 @@ public class Pipeline<InputType> {
//set our observable to start at the application
final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
- this.currentObservable = Observable.just( filter );
+
+ this.currentObservable = Observable.just( filter ).subscribeOn(Schedulers.io());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
index 91773c4..29a7134 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -61,9 +61,19 @@ public class ResultsPageCollector<T> extends AbstractFilter<FilterResult<T>, Res
final int limit = pipelineContext.getLimit();
- return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
- () -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
- new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+ return filterResultObservable
+ .buffer( limit )
+ .flatMap( buffer
+ -> Observable
+ .from( buffer )
+ .collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) )
+ )
+ .map( resultsPageCollector ->
+ new ResultsPage(
+ resultsPageCollector.results,
+ new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
+ )
+ );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index fce1fb2..a20b84f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import rx.Observable;
+import rx.schedulers.Schedulers;
/**
@@ -97,11 +98,12 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
public boolean hasNext() {
if ( iterator == null ) {
- iterator = resultsObservable.toBlocking().getIterator();
+ iterator = resultsObservable.toBlocking().getIterator();
}
+ boolean hasNext = iterator.hasNext();
- return iterator.hasNext();
+ return hasNext;
}