You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/09 01:29:17 UTC

incubator-usergrid git commit: add scheduler to search observables

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 28bc4ca8e -> 7a72c5b19


add scheduler to search observables


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a72c5b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a72c5b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a72c5b1

Branch: refs/heads/two-dot-o-dev
Commit: 7a72c5b19ded2ba19f27b8370971a78f8b229fc0
Parents: 28bc4ca
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Jul 8 17:29:01 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Jul 8 17:29:01 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/pipeline/Pipeline.java |  4 +++-
 .../pipeline/read/collect/ResultsPageCollector.java | 16 +++++++++++++---
 .../results/ObservableQueryExecutor.java            |  6 ++++--
 3 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index dc95178..81f857f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -30,6 +30,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import rx.Observable;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -75,7 +76,8 @@ public class Pipeline<InputType> {
 
         //set our observable to start at the application
         final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
-        this.currentObservable = Observable.just( filter );
+
+        this.currentObservable = Observable.just( filter ).subscribeOn(Schedulers.io());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
index 91773c4..29a7134 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -61,9 +61,19 @@ public class ResultsPageCollector<T> extends AbstractFilter<FilterResult<T>, Res
 
         final int limit = pipelineContext.getLimit();
 
-        return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
-            () -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
-            new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+        return filterResultObservable
+            .buffer( limit )
+            .flatMap( buffer
+                -> Observable
+                    .from( buffer )
+                    .collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) )
+            )
+            .map( resultsPageCollector ->
+                new ResultsPage(
+                    resultsPageCollector.results,
+                    new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
+                )
+            );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index fce1fb2..a20b84f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 
 import rx.Observable;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -97,11 +98,12 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
     public boolean hasNext() {
 
         if ( iterator == null ) {
-            iterator = resultsObservable.toBlocking().getIterator();
+            iterator =  resultsObservable.toBlocking().getIterator();
         }
 
+        boolean hasNext = iterator.hasNext();
 
-        return iterator.hasNext();
+        return hasNext;
     }