You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/09 22:37:39 UTC
incubator-usergrid git commit: add blocking iterator
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 7a72c5b19 -> 8a316e88b
add blocking iterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a316e88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a316e88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a316e88
Branch: refs/heads/two-dot-o-dev
Commit: 8a316e88bf3e9c8f91adf9c7ada808aa98c4e557
Parents: 7a72c5b
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Jul 9 14:22:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Jul 9 14:22:48 2015 -0600
----------------------------------------------------------------------
.../corepersistence/pipeline/Pipeline.java | 2 +-
.../search/AbstractElasticSearchFilter.java | 2 +
.../results/ObservableQueryExecutor.java | 12 +-
.../usergrid/persistence/CollectionIT.java | 72 ++++++-----
.../rx/ObservableToBlockingIteratorFactory.java | 126 +++++++++++++++++++
.../persistence/core/rx/OrderedMergeTest.java | 103 ++++++++++-----
stack/corepersistence/pom.xml | 2 +-
7 files changed, 246 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index 81f857f..13edb2c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -77,7 +77,7 @@ public class Pipeline<InputType> {
//set our observable to start at the application
final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
- this.currentObservable = Observable.just( filter ).subscribeOn(Schedulers.io());
+ this.currentObservable = Observable.just( filter );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
index 9f598ff..426fa47 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
@@ -100,10 +100,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
subscriber.onStart();
+ int count = 0;
//emit while we have values from ES and someone is subscribed
while ( !subscriber.isUnsubscribed() ) {
+ log.info("elastic search iteration loop {}",count++);
try {
final CandidateResults candidateResults =
applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index a20b84f..548e584 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.rx.ObservableToBlockingIteratorFactory;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,7 +53,8 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) {
//map to our old results objects, return a default empty if required
this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) )
- .defaultIfEmpty( new Results() );
+ .defaultIfEmpty(new Results())
+ .subscribeOn(Schedulers.io());
}
@@ -98,7 +100,7 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
public boolean hasNext() {
if ( iterator == null ) {
- iterator = resultsObservable.toBlocking().getIterator();
+ iterator = ObservableToBlockingIteratorFactory.toIterator( resultsObservable );
}
boolean hasNext = iterator.hasNext();
@@ -118,4 +120,10 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
return next;
}
+
+ @Override
+ protected void finalize() throws Throwable {
+ resultsObservable.unsubscribeOn(Schedulers.io());
+ super.finalize();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index eab04ce..b3337ab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -17,12 +17,7 @@
package org.apache.usergrid.persistence;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import org.junit.Rule;
import org.junit.Test;
@@ -38,6 +33,8 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.utils.JsonUtils;
import org.apache.usergrid.utils.UUIDUtils;
+import rx.Observable;
+import rx.schedulers.Schedulers;
import static org.apache.usergrid.utils.MapUtils.hashMap;
import static org.junit.Assert.assertEquals;
@@ -466,18 +463,18 @@ public class CollectionIT extends AbstractCoreIT {
properties.put( "verb", "post" );
properties.put( "content", "I wrote a blog post" );
- em.addToCollection( user, "activities", em.create( "activity", properties ) );
+ em.addToCollection(user, "activities", em.create("activity", properties));
properties = new LinkedHashMap<String, Object>();
properties.put( "actor", hashMap( "displayName", "Ed Anuff" ).map( "objectType", "person" ) );
- properties.put( "verb", "tweet" );
+ properties.put("verb", "tweet");
properties.put( "content", "I ate another sammich" );
- em.addToCollection( user, "activities", em.create( "activity", properties ) );
+ em.addToCollection(user, "activities", em.create("activity", properties));
properties = new LinkedHashMap<String, Object>();
- properties.put( "actor", hashMap( "displayName", "Ed Anuff" ).map( "objectType", "person" ) );
- properties.put( "verb", "post" );
+ properties.put("actor", hashMap("displayName", "Ed Anuff").map("objectType", "person"));
+ properties.put("verb", "post");
properties.put( "content", "I wrote another blog post" );
em.addToCollection( user, "activities", em.create( "activity", properties ) );
@@ -486,7 +483,7 @@ public class CollectionIT extends AbstractCoreIT {
final Query query = Query.fromQL( "verb = 'post'" );
- Results r = em.searchCollection( user, "activities", query );
+ Results r = em.searchCollection(user, "activities", query);
LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
assertEquals( 2, r.size() );
}
@@ -604,7 +601,7 @@ public class CollectionIT extends AbstractCoreIT {
// EntityRef
Query query = Query
- .fromQL( "select * where keywords contains 'Random' " + "OR keywords contains 'Game' order by title desc" );
+ .fromQL("select * where keywords contains 'Random' " + "OR keywords contains 'Game' order by title desc");
Results r = em.searchCollection( em.getApplicationRef(), "orquerygames", query );
@@ -625,11 +622,11 @@ public class CollectionIT extends AbstractCoreIT {
assertEquals( 2, r.size() );
- returned = r.getEntities().get( 0 );
+ returned = r.getEntities().get(0);
assertEquals( game1.getUuid(), returned.getUuid() );
- returned = r.getEntities().get( 1 );
+ returned = r.getEntities().get(1);
assertEquals( game2.getUuid(), returned.getUuid() );
@@ -646,13 +643,13 @@ public class CollectionIT extends AbstractCoreIT {
assertEquals( game1.getUuid(), returned.getUuid() );
query = Query
- .fromQL( "select * where title contains 'blah' " + "OR keywords contains 'blah' order by title desc" );
+ .fromQL("select * where title contains 'blah' " + "OR keywords contains 'blah' order by title desc");
r = em.searchCollection( em.getApplicationRef(), "orquerygames", query );
assertEquals( 1, r.size() );
- returned = r.getEntities().get( 0 );
+ returned = r.getEntities().get(0);
assertEquals( game1.getUuid(), returned.getUuid() );
}
@@ -821,32 +818,32 @@ public class CollectionIT extends AbstractCoreIT {
// search for games without sub-field Foo should returned zero entities
- Query query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Foo'" ).withLimit( 1 );
+ Query query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Foo'" ).withLimit(1);
Results r = em.searchCollection( em.getApplicationRef(), "games", query );
assertEquals( 0, r.size() );
- assertNull( r.getCursor() );
+ assertNull(r.getCursor());
// full negation in simple with lower limit
query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Bar'" ).withLimit( 1 );
r = em.searchCollection( em.getApplicationRef(), "games", query );
assertEquals( 1, r.size() );
- assertNotNull( r.getCursor() );
- assertEquals( entity2, r.getEntities().get( 0 ) );
+ assertNotNull(r.getCursor());
+ assertEquals(entity2, r.getEntities().get(0));
query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Bar'" ).withLimit( 1 )
.withCursor( r.getCursor() );
r = em.searchCollection( em.getApplicationRef(), "games", query );
- assertEquals( 1, r.size() );
- assertNotNull( r.getCursor() );
- assertEquals( entity1, r.getEntities().get( 0 ) );
+ assertEquals(1, r.size());
+ assertNotNull(r.getCursor());
+ assertEquals(entity1, r.getEntities().get(0));
query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Bar'" ).withLimit( 1 )
.withCursor( r.getCursor() );
r = em.searchCollection( em.getApplicationRef(), "games", query );
- assertEquals( 0, r.size() );
- assertNull( r.getCursor() );
+ assertEquals(0, r.size());
+ assertNull(r.getCursor());
}
@@ -869,14 +866,14 @@ public class CollectionIT extends AbstractCoreIT {
properties = new LinkedHashMap<String, Object>();
properties.put( "title", "Hot Shots" );
- properties.put( "keywords", "Action, New" );
+ properties.put("keywords", "Action, New");
em.create( "game", properties );
app.refreshIndex();
Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" );
Results r = em.searchCollection( em.getApplicationRef(), "games", query );
- LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
+ LOG.info(JsonUtils.mapToFormattedJsonString(r.getEntities()));
assertEquals( 3, r.size() );
}
@@ -895,7 +892,7 @@ public class CollectionIT extends AbstractCoreIT {
properties = new LinkedHashMap<String, Object>();
properties.put( "title", "Bunnies Extreme" );
- properties.put( "keywords", "Hot, New" );
+ properties.put("keywords", "Hot, New");
Entity secondGame = em.create( "game", properties );
properties = new LinkedHashMap<String, Object>();
@@ -908,7 +905,7 @@ public class CollectionIT extends AbstractCoreIT {
Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
Results r = em.searchCollection( em.getApplicationRef(), "games", query );
LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
- assertEquals( 2, r.size() );
+ assertEquals(2, r.size());
assertEquals( thirdGame.getUuid(), r.getEntities().get( 0 ).getUuid() );
assertEquals( secondGame.getUuid(), r.getEntities().get( 1 ).getUuid() );
@@ -1144,7 +1141,7 @@ public class CollectionIT extends AbstractCoreIT {
for ( int j = 0; j < pageSize; j++ ) {
final int index = size - 1 - ( i * pageSize + j );
- final UUID expectedId = entityIds.get( index );
+ final UUID expectedId = entityIds.get(index);
assertEquals( expectedId, r.getEntities().get( j ).getUuid() );
}
@@ -1166,7 +1163,7 @@ public class CollectionIT extends AbstractCoreIT {
EntityManager em = app.getEntityManager();
assertNotNull( em );
- int size = 40;
+ int size = 60;
List<UUID> entityIds = new ArrayList<UUID>();
for ( int i = 0; i < size; i++ ) {
@@ -1179,15 +1176,15 @@ public class CollectionIT extends AbstractCoreIT {
app.refreshIndex();
- int pageSize = 10;
+ int pageSize = 5;
- Query query = Query.fromQL( "select * where index >= 10 and index <= 29 order by index asc" );
+ Query query = Query.fromQL("select * where index >= 5 and index <= 49 order by index asc");
query.setLimit( pageSize );
- Results r = em.searchCollection( em.getApplicationRef(), "pages", query );
+ Results r = em.searchCollection(em.getApplicationRef(), "pages", query);
// check they're all the same before deletion
- for ( int i = 1; i < 3; i++ ) {
+ for ( int i = 1; i < 10; i++ ) {
LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
@@ -1201,12 +1198,13 @@ public class CollectionIT extends AbstractCoreIT {
assertEquals( entityId, returnedId );
}
+ LOG.info( "collection loop "+i );
r = r.getNextPageResults();
}
assertEquals( 0, r.size() );
- assertNull( r.getCursor() );
+ assertNull(r.getCursor());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
new file mode 100644
index 0000000..23c43ba
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.rx;
+
+/**
+ * Classy class class.
+ */
+
+import rx.Notification;
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.exceptions.Exceptions;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Returns an Iterator that iterates over all items emitted by a specified Observable.
+ * <p>
+ * <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterator.png" alt="">
+ * <p>
+ *
+ * @see <a href="https://github.com/ReactiveX/RxJava/issues/50">Issue #50</a>
+ */
+public final class ObservableToBlockingIteratorFactory {
+ private ObservableToBlockingIteratorFactory() {
+ throw new IllegalStateException("No instances!");
+ }
+
+ /**
+ * Returns an iterator that iterates all values of the observable.
+ *
+ * @param <T>
+ * the type of source.
+ * @return the iterator that could be used to iterate over the elements of the observable.
+ */
+ public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
+ final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
+
+ // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
+ final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ try{
+ notifications.put(Notification.<T>createOnError(e));
+ }catch (Exception t){
+
+ }
+ }
+
+ @Override
+ public void onNext(Notification<? extends T> args) {
+ try{
+ notifications.put(args);
+ }catch (Exception t){
+
+ }
+ }
+ });
+
+ return new Iterator<T>() {
+ private Notification<? extends T> buf;
+
+ @Override
+ public boolean hasNext() {
+ if (buf == null) {
+ buf = take();
+ }
+ if (buf.isOnError()) {
+ throw Exceptions.propagate(buf.getThrowable());
+ }
+ return !buf.isOnCompleted();
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T result = buf.getValue();
+ buf = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ private Notification<? extends T> take() {
+ try {
+ return notifications.take();
+ } catch (InterruptedException e) {
+ subscription.unsubscribe();
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index b5c8900..649ac7a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -19,19 +19,23 @@
package org.apache.usergrid.persistence.core.rx;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Notification;
import rx.Observable;
+import rx.Observable.Transformer;
import rx.Subscriber;
+import rx.Subscription;
+import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
@@ -152,20 +156,20 @@ public class OrderedMergeTest {
List<Integer> expected1List = Arrays.asList( 5, 3, 2, 0 );
- Observable<Integer> expected1 = Observable.from( expected1List );
+ Observable<Integer> expected1 = Observable.from(expected1List);
- List<Integer> expected2List = Arrays.asList( 10, 7, 6, 4 );
+ List<Integer> expected2List = Arrays.asList(10, 7, 6, 4);
- Observable<Integer> expected2 = Observable.from( expected2List );
+ Observable<Integer> expected2 = Observable.from(expected2List);
- List<Integer> expected3List = Arrays.asList( 9, 8, 1 );
+ List<Integer> expected3List = Arrays.asList(9, 8, 1);
- Observable<Integer> expected3 = Observable.from( expected3List );
+ Observable<Integer> expected3 = Observable.from(expected3List);
//set our buffer size to 2. We should easily exceed this since every observable has more than 2 elements
Observable<Integer> ordered =
- OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
+ OrderedMerge.orderedMerge(new ReverseIntegerComparator(), 2, expected1, expected2, expected3);
final CountDownLatch latch = new CountDownLatch( 1 );
final List<Integer> results = new ArrayList();
@@ -204,27 +208,27 @@ public class OrderedMergeTest {
/**
* Since we're on the same thread, we should blow up before we begin producing elements our size
*/
- assertEquals( 0, results.size() );
+ assertEquals(0, results.size());
- assertTrue( "An exception was thrown", errorThrown[0] );
+ assertTrue("An exception was thrown", errorThrown[0]);
}
@Test
public void multipleOperatorThreads() throws InterruptedException {
- List<Integer> expected1List = Arrays.asList( 5, 3, 2, 0 );
+ List<Integer> expected1List = Arrays.asList(5, 3, 2, 0);
- Observable<Integer> expected1 = Observable.from( expected1List ).subscribeOn( Schedulers.io() );
+ Observable<Integer> expected1 = Observable.from( expected1List ).subscribeOn(Schedulers.io());
- List<Integer> expected2List = Arrays.asList( 10, 7, 6, 4 );
+ List<Integer> expected2List = Arrays.asList(10, 7, 6, 4);
- Observable<Integer> expected2 = Observable.from( expected2List ).subscribeOn( Schedulers.io() );
+ Observable<Integer> expected2 = Observable.from( expected2List ).subscribeOn(Schedulers.io());
- List<Integer> expected3List = Arrays.asList( 9, 8, 1 );
+ List<Integer> expected3List = Arrays.asList(9, 8, 1);
- Observable<Integer> expected3 = Observable.from( expected3List ).subscribeOn( Schedulers.io() );
+ Observable<Integer> expected3 = Observable.from( expected3List ).subscribeOn(Schedulers.io());
Observable<Integer> ordered =
@@ -233,7 +237,7 @@ public class OrderedMergeTest {
final CountDownLatch latch = new CountDownLatch( 1 );
final List<Integer> results = new ArrayList();
- ordered.subscribe( new Subscriber<Integer>() {
+ ordered.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
@@ -241,24 +245,24 @@ public class OrderedMergeTest {
@Override
- public void onError( final Throwable e ) {
+ public void onError(final Throwable e) {
e.printStackTrace();
- fail( "An error was thrown " );
+ fail("An error was thrown ");
}
@Override
- public void onNext( final Integer integer ) {
- log.info( "onNext invoked with {}", integer );
- results.add( integer );
+ public void onNext(final Integer integer) {
+ log.info("onNext invoked with {}", integer);
+ results.add(integer);
}
- } );
+ });
latch.await();
List<Integer> expected = Arrays.asList( 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );
- assertEquals( expected.size(), results.size() );
+ assertEquals(expected.size(), results.size());
for ( int i = 0; i < expected.size(); i++ ) {
@@ -306,7 +310,7 @@ public class OrderedMergeTest {
@Override
public void onError( final Throwable e ) {
- log.error( "Expected error thrown", e );
+ log.error("Expected error thrown", e);
if ( e.getMessage().contains( "The maximum queue size of 2 has been reached" ) ) {
errorThrown[0] = true;
@@ -318,14 +322,14 @@ public class OrderedMergeTest {
@Override
public void onNext( final Integer integer ) {
- log.info( "onNext invoked with {}", integer );
+ log.info("onNext invoked with {}", integer);
}
} );
latch.await();
- assertTrue( "An exception was thrown", errorThrown[0] );
+ assertTrue("An exception was thrown", errorThrown[0]);
}
@@ -374,14 +378,14 @@ public class OrderedMergeTest {
@Override
public void onError( final Throwable e ) {
e.printStackTrace();
- fail( "An error was thrown " );
+ fail("An error was thrown ");
}
@Override
public void onNext( final Integer integer ) {
log.info( "onNext invoked with {}", integer );
- results.add( integer );
+ results.add(integer);
}
} );
@@ -389,7 +393,7 @@ public class OrderedMergeTest {
List<Integer> expected = Arrays.asList( 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );
- assertEquals( expected.size(), results.size() );
+ assertEquals(expected.size(), results.size());
for ( int i = 0; i < expected.size(); i++ ) {
@@ -535,6 +539,36 @@ public class OrderedMergeTest {
}
}
+ @Test
+ public void obsIterator() {
+ Iterator<Object> iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
+ int count = 0;
+ while (!subscriber.isUnsubscribed()) {
+ //pull from source
+ for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
+ //emit
+ log.info("loop " + count);
+ subscriber.onNext(count++);
+ }
+ }
+
+ subscriber.onCompleted();
+ })
+ .onBackpressureBlock(1)
+ .doOnNext(o -> {
+ log.info("iteration " + o);
+ }).subscribeOn(Schedulers.io()));
+ //never
+ Object it =iterator.next();
+ it = iterator.next();
+ log.info("iterate");
+ it = iterator.next();
+ log.info("iterate");
+
+ Object size = it;
+ }
+
+
private static class IntegerComparator implements Comparator<Integer> {
@@ -552,4 +586,9 @@ public class OrderedMergeTest {
return Integer.compare( o1, o2 ) * -1;
}
}
+
+
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index df625c8..47d3748 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -71,7 +71,7 @@ limitations under the License.
<junit.version>4.11</junit.version>
<kryo-serializers.version>0.26</kryo-serializers.version>
<log4j.version>1.2.17</log4j.version>
- <rx.version>1.0.9</rx.version>
+ <rx.version>1.0.12</rx.version>
<slf4j.version>1.7.2</slf4j.version>
<surefire.version>2.16</surefire.version>
<aws.version>1.9.37</aws.version>