You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/09 22:37:39 UTC

incubator-usergrid git commit: add blocking iterator

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 7a72c5b19 -> 8a316e88b


add blocking iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a316e88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a316e88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a316e88

Branch: refs/heads/two-dot-o-dev
Commit: 8a316e88bf3e9c8f91adf9c7ada808aa98c4e557
Parents: 7a72c5b
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Jul 9 14:22:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Jul 9 14:22:48 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/pipeline/Pipeline.java      |   2 +-
 .../search/AbstractElasticSearchFilter.java     |   2 +
 .../results/ObservableQueryExecutor.java        |  12 +-
 .../usergrid/persistence/CollectionIT.java      |  72 ++++++-----
 .../rx/ObservableToBlockingIteratorFactory.java | 126 +++++++++++++++++++
 .../persistence/core/rx/OrderedMergeTest.java   | 103 ++++++++++-----
 stack/corepersistence/pom.xml                   |   2 +-
 7 files changed, 246 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index 81f857f..13edb2c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -77,7 +77,7 @@ public class Pipeline<InputType> {
         //set our observable to start at the application
         final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
 
-        this.currentObservable = Observable.just( filter ).subscribeOn(Schedulers.io());
+        this.currentObservable = Observable.just( filter );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
index 9f598ff..426fa47 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
@@ -100,10 +100,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id,
 
                 subscriber.onStart();
 
+                int count = 0;
                 //emit while we have values from ES and someone is subscribed
                 while ( !subscriber.isUnsubscribed() ) {
 
 
+                    log.info("elastic search iteration loop {}",count++);
                     try {
                         final CandidateResults candidateResults =
                             applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index a20b84f..548e584 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.rx.ObservableToBlockingIteratorFactory;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -52,7 +53,8 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
     public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) {
         //map to our old results objects, return a default empty if required
         this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) )
-                                                  .defaultIfEmpty( new Results() );
+                                                  .defaultIfEmpty(new Results())
+            .subscribeOn(Schedulers.io());
     }
 
 
@@ -98,7 +100,7 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
     public boolean hasNext() {
 
         if ( iterator == null ) {
-            iterator =  resultsObservable.toBlocking().getIterator();
+            iterator = ObservableToBlockingIteratorFactory.toIterator( resultsObservable );
         }
 
         boolean hasNext = iterator.hasNext();
@@ -118,4 +120,10 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
 
         return next;
     }
+
+    @Override
+    protected void finalize() throws Throwable {
+        resultsObservable.unsubscribeOn(Schedulers.io());
+        super.finalize();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index eab04ce..b3337ab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -17,12 +17,7 @@
 package org.apache.usergrid.persistence;
 
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +33,8 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.utils.JsonUtils;
 import org.apache.usergrid.utils.UUIDUtils;
+import rx.Observable;
+import rx.schedulers.Schedulers;
 
 import static org.apache.usergrid.utils.MapUtils.hashMap;
 import static org.junit.Assert.assertEquals;
@@ -466,18 +463,18 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put( "verb", "post" );
         properties.put( "content", "I wrote a blog post" );
 
-        em.addToCollection( user, "activities", em.create( "activity", properties ) );
+        em.addToCollection(user, "activities", em.create("activity", properties));
 
         properties = new LinkedHashMap<String, Object>();
         properties.put( "actor", hashMap( "displayName", "Ed Anuff" ).map( "objectType", "person" ) );
-        properties.put( "verb", "tweet" );
+        properties.put("verb", "tweet");
         properties.put( "content", "I ate another sammich" );
 
-        em.addToCollection( user, "activities", em.create( "activity", properties ) );
+        em.addToCollection(user, "activities", em.create("activity", properties));
 
         properties = new LinkedHashMap<String, Object>();
-        properties.put( "actor", hashMap( "displayName", "Ed Anuff" ).map( "objectType", "person" ) );
-        properties.put( "verb", "post" );
+        properties.put("actor", hashMap("displayName", "Ed Anuff").map("objectType", "person"));
+        properties.put("verb", "post");
         properties.put( "content", "I wrote another blog post" );
 
         em.addToCollection( user, "activities", em.create( "activity", properties ) );
@@ -486,7 +483,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         final Query query = Query.fromQL( "verb = 'post'" );
 
-        Results r = em.searchCollection( user, "activities", query );
+        Results r = em.searchCollection(user, "activities", query);
         LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
         assertEquals( 2, r.size() );
     }
@@ -604,7 +601,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         // EntityRef
         Query query = Query
-            .fromQL( "select * where keywords contains 'Random' " + "OR keywords contains 'Game' order by title desc" );
+            .fromQL("select * where keywords contains 'Random' " + "OR keywords contains 'Game' order by title desc");
 
         Results r = em.searchCollection( em.getApplicationRef(), "orquerygames", query );
 
@@ -625,11 +622,11 @@ public class CollectionIT extends AbstractCoreIT {
 
         assertEquals( 2, r.size() );
 
-        returned = r.getEntities().get( 0 );
+        returned = r.getEntities().get(0);
 
         assertEquals( game1.getUuid(), returned.getUuid() );
 
-        returned = r.getEntities().get( 1 );
+        returned = r.getEntities().get(1);
 
         assertEquals( game2.getUuid(), returned.getUuid() );
 
@@ -646,13 +643,13 @@ public class CollectionIT extends AbstractCoreIT {
         assertEquals( game1.getUuid(), returned.getUuid() );
 
         query = Query
-            .fromQL( "select * where  title contains 'blah' " + "OR keywords contains 'blah' order by title desc" );
+            .fromQL("select * where  title contains 'blah' " + "OR keywords contains 'blah' order by title desc");
 
         r = em.searchCollection( em.getApplicationRef(), "orquerygames", query );
 
         assertEquals( 1, r.size() );
 
-        returned = r.getEntities().get( 0 );
+        returned = r.getEntities().get(0);
 
         assertEquals( game1.getUuid(), returned.getUuid() );
     }
@@ -821,32 +818,32 @@ public class CollectionIT extends AbstractCoreIT {
 
         // search for games without sub-field Foo should returned zero entities
 
-        Query query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Foo'" ).withLimit( 1 );
+        Query query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Foo'" ).withLimit(1);
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
         assertEquals( 0, r.size() );
-        assertNull( r.getCursor() );
+        assertNull(r.getCursor());
 
 
         // full negation in simple with lower limit
         query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Bar'" ).withLimit( 1 );
         r = em.searchCollection( em.getApplicationRef(), "games", query );
         assertEquals( 1, r.size() );
-        assertNotNull( r.getCursor() );
-        assertEquals( entity2, r.getEntities().get( 0 ) );
+        assertNotNull(r.getCursor());
+        assertEquals(entity2, r.getEntities().get(0));
 
 
         query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Bar'" ).withLimit( 1 )
                      .withCursor( r.getCursor() );
         r = em.searchCollection( em.getApplicationRef(), "games", query );
-        assertEquals( 1, r.size() );
-        assertNotNull( r.getCursor() );
-        assertEquals( entity1, r.getEntities().get( 0 ) );
+        assertEquals(1, r.size());
+        assertNotNull(r.getCursor());
+        assertEquals(entity1, r.getEntities().get(0));
 
         query = Query.fromQL( "select * where NOT subObjectArray.subField = 'Bar'" ).withLimit( 1 )
                      .withCursor( r.getCursor() );
         r = em.searchCollection( em.getApplicationRef(), "games", query );
-        assertEquals( 0, r.size() );
-        assertNull( r.getCursor() );
+        assertEquals(0, r.size());
+        assertNull(r.getCursor());
     }
 
 
@@ -869,14 +866,14 @@ public class CollectionIT extends AbstractCoreIT {
 
         properties = new LinkedHashMap<String, Object>();
         properties.put( "title", "Hot Shots" );
-        properties.put( "keywords", "Action, New" );
+        properties.put("keywords", "Action, New");
         em.create( "game", properties );
 
         app.refreshIndex();
 
         Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
-        LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
+        LOG.info(JsonUtils.mapToFormattedJsonString(r.getEntities()));
         assertEquals( 3, r.size() );
     }
 
@@ -895,7 +892,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         properties = new LinkedHashMap<String, Object>();
         properties.put( "title", "Bunnies Extreme" );
-        properties.put( "keywords", "Hot, New" );
+        properties.put("keywords", "Hot, New");
         Entity secondGame = em.create( "game", properties );
 
         properties = new LinkedHashMap<String, Object>();
@@ -908,7 +905,7 @@ public class CollectionIT extends AbstractCoreIT {
         Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
         LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
-        assertEquals( 2, r.size() );
+        assertEquals(2, r.size());
 
         assertEquals( thirdGame.getUuid(), r.getEntities().get( 0 ).getUuid() );
         assertEquals( secondGame.getUuid(), r.getEntities().get( 1 ).getUuid() );
@@ -1144,7 +1141,7 @@ public class CollectionIT extends AbstractCoreIT {
 
             for ( int j = 0; j < pageSize; j++ ) {
                 final int index = size - 1 - ( i * pageSize + j );
-                final UUID expectedId = entityIds.get( index );
+                final UUID expectedId = entityIds.get(index);
                 assertEquals( expectedId, r.getEntities().get( j ).getUuid() );
             }
 
@@ -1166,7 +1163,7 @@ public class CollectionIT extends AbstractCoreIT {
         EntityManager em = app.getEntityManager();
         assertNotNull( em );
 
-        int size = 40;
+        int size = 60;
         List<UUID> entityIds = new ArrayList<UUID>();
 
         for ( int i = 0; i < size; i++ ) {
@@ -1179,15 +1176,15 @@ public class CollectionIT extends AbstractCoreIT {
 
         app.refreshIndex();
 
-        int pageSize = 10;
+        int pageSize = 5;
 
-        Query query = Query.fromQL( "select * where index >= 10 and index <= 29 order by index asc" );
+        Query query = Query.fromQL("select * where index >= 5 and index <= 49 order by index asc");
         query.setLimit( pageSize );
 
-        Results r = em.searchCollection( em.getApplicationRef(), "pages", query );
+        Results r = em.searchCollection(em.getApplicationRef(), "pages", query);
 
         // check they're all the same before deletion
-        for ( int i = 1; i < 3; i++ ) {
+        for ( int i = 1; i < 10; i++ ) {
 
             LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
 
@@ -1201,12 +1198,13 @@ public class CollectionIT extends AbstractCoreIT {
 
                 assertEquals( entityId, returnedId );
             }
+            LOG.info( "collection loop "+i );
 
             r = r.getNextPageResults();
         }
 
         assertEquals( 0, r.size() );
-        assertNull( r.getCursor() );
+        assertNull(r.getCursor());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
new file mode 100644
index 0000000..23c43ba
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.rx;
+
+/**
+ * Classy class class.
+ */
+
+import rx.Notification;
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.exceptions.Exceptions;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Returns an Iterator that iterates over all items emitted by a specified Observable.
+ * <p>
+ * <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterator.png" alt="">
+ * <p>
+ *
+ * @see <a href="https://github.com/ReactiveX/RxJava/issues/50">Issue #50</a>
+ */
+public final class ObservableToBlockingIteratorFactory {
+    private ObservableToBlockingIteratorFactory() {
+        throw new IllegalStateException("No instances!");
+    }
+
+    /**
+     * Returns an iterator that iterates all values of the observable.
+     *
+     * @param <T>
+     *            the type of source.
+     * @return the iterator that could be used to iterate over the elements of the observable.
+     */
+    public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
+        final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
+
+        // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
+        final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
+            @Override
+            public void onCompleted() {
+                // ignore
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                try{
+                    notifications.put(Notification.<T>createOnError(e));
+                }catch (Exception t){
+
+                }
+            }
+
+            @Override
+            public void onNext(Notification<? extends T> args) {
+                try{
+                    notifications.put(args);
+                }catch (Exception t){
+
+                }
+            }
+        });
+
+        return new Iterator<T>() {
+            private Notification<? extends T> buf;
+
+            @Override
+            public boolean hasNext() {
+                if (buf == null) {
+                    buf = take();
+                }
+                if (buf.isOnError()) {
+                    throw Exceptions.propagate(buf.getThrowable());
+                }
+                return !buf.isOnCompleted();
+            }
+
+            @Override
+            public T next() {
+                if (hasNext()) {
+                    T result = buf.getValue();
+                    buf = null;
+                    return result;
+                }
+                throw new NoSuchElementException();
+            }
+
+            private Notification<? extends T> take() {
+                try {
+                    return notifications.take();
+                } catch (InterruptedException e) {
+                    subscription.unsubscribe();
+                    throw Exceptions.propagate(e);
+                }
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("Read-only iterator");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index b5c8900..649ac7a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -19,19 +19,23 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import rx.Notification;
 import rx.Observable;
+import rx.Observable.Transformer;
 import rx.Subscriber;
+import rx.Subscription;
+import rx.exceptions.Exceptions;
 import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
@@ -152,20 +156,20 @@ public class OrderedMergeTest {
 
         List<Integer> expected1List = Arrays.asList( 5, 3, 2, 0 );
 
-        Observable<Integer> expected1 = Observable.from( expected1List );
+        Observable<Integer> expected1 = Observable.from(expected1List);
 
-        List<Integer> expected2List = Arrays.asList( 10, 7, 6, 4 );
+        List<Integer> expected2List = Arrays.asList(10, 7, 6, 4);
 
-        Observable<Integer> expected2 = Observable.from( expected2List );
+        Observable<Integer> expected2 = Observable.from(expected2List);
 
-        List<Integer> expected3List = Arrays.asList( 9, 8, 1 );
+        List<Integer> expected3List = Arrays.asList(9, 8, 1);
 
-        Observable<Integer> expected3 = Observable.from( expected3List );
+        Observable<Integer> expected3 = Observable.from(expected3List);
 
         //set our buffer size to 2.  We should easily exceed this since every observable has more than 2 elements
 
         Observable<Integer> ordered =
-                OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
+                OrderedMerge.orderedMerge(new ReverseIntegerComparator(), 2, expected1, expected2, expected3);
 
         final CountDownLatch latch = new CountDownLatch( 1 );
         final List<Integer> results = new ArrayList();
@@ -204,27 +208,27 @@ public class OrderedMergeTest {
         /**
          * Since we're on the same thread, we should blow up before we begin producing elements our size
          */
-        assertEquals( 0, results.size() );
+        assertEquals(0, results.size());
 
-        assertTrue( "An exception was thrown", errorThrown[0] );
+        assertTrue("An exception was thrown", errorThrown[0]);
     }
 
 
     @Test
     public void multipleOperatorThreads() throws InterruptedException {
 
-        List<Integer> expected1List = Arrays.asList( 5, 3, 2, 0 );
+        List<Integer> expected1List = Arrays.asList(5, 3, 2, 0);
 
-        Observable<Integer> expected1 = Observable.from( expected1List ).subscribeOn( Schedulers.io() );
+        Observable<Integer> expected1 = Observable.from( expected1List ).subscribeOn(Schedulers.io());
 
-        List<Integer> expected2List = Arrays.asList( 10, 7, 6, 4 );
+        List<Integer> expected2List = Arrays.asList(10, 7, 6, 4);
 
-        Observable<Integer> expected2 = Observable.from( expected2List ).subscribeOn( Schedulers.io() );
+        Observable<Integer> expected2 = Observable.from( expected2List ).subscribeOn(Schedulers.io());
 
 
-        List<Integer> expected3List = Arrays.asList( 9, 8, 1 );
+        List<Integer> expected3List = Arrays.asList(9, 8, 1);
 
-        Observable<Integer> expected3 = Observable.from( expected3List ).subscribeOn( Schedulers.io() );
+        Observable<Integer> expected3 = Observable.from( expected3List ).subscribeOn(Schedulers.io());
 
 
         Observable<Integer> ordered =
@@ -233,7 +237,7 @@ public class OrderedMergeTest {
         final CountDownLatch latch = new CountDownLatch( 1 );
         final List<Integer> results = new ArrayList();
 
-        ordered.subscribe( new Subscriber<Integer>() {
+        ordered.subscribe(new Subscriber<Integer>() {
             @Override
             public void onCompleted() {
                 latch.countDown();
@@ -241,24 +245,24 @@ public class OrderedMergeTest {
 
 
             @Override
-            public void onError( final Throwable e ) {
+            public void onError(final Throwable e) {
                 e.printStackTrace();
-                fail( "An error was thrown " );
+                fail("An error was thrown ");
             }
 
 
             @Override
-            public void onNext( final Integer integer ) {
-                log.info( "onNext invoked with {}", integer );
-                results.add( integer );
+            public void onNext(final Integer integer) {
+                log.info("onNext invoked with {}", integer);
+                results.add(integer);
             }
-        } );
+        });
 
         latch.await();
 
         List<Integer> expected = Arrays.asList( 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );
 
-        assertEquals( expected.size(), results.size() );
+        assertEquals(expected.size(), results.size());
 
 
         for ( int i = 0; i < expected.size(); i++ ) {
@@ -306,7 +310,7 @@ public class OrderedMergeTest {
 
             @Override
             public void onError( final Throwable e ) {
-                log.error( "Expected error thrown", e );
+                log.error("Expected error thrown", e);
 
                 if ( e.getMessage().contains( "The maximum queue size of 2 has been reached" ) ) {
                     errorThrown[0] = true;
@@ -318,14 +322,14 @@ public class OrderedMergeTest {
 
             @Override
             public void onNext( final Integer integer ) {
-                log.info( "onNext invoked with {}", integer );
+                log.info("onNext invoked with {}", integer);
             }
         } );
 
         latch.await();
 
 
-        assertTrue( "An exception was thrown", errorThrown[0] );
+        assertTrue("An exception was thrown", errorThrown[0]);
     }
 
 
@@ -374,14 +378,14 @@ public class OrderedMergeTest {
             @Override
             public void onError( final Throwable e ) {
                 e.printStackTrace();
-                fail( "An error was thrown " );
+                fail("An error was thrown ");
             }
 
 
             @Override
             public void onNext( final Integer integer ) {
                 log.info( "onNext invoked with {}", integer );
-                results.add( integer );
+                results.add(integer);
             }
         } );
 
@@ -389,7 +393,7 @@ public class OrderedMergeTest {
 
         List<Integer> expected = Arrays.asList( 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );
 
-        assertEquals( expected.size(), results.size() );
+        assertEquals(expected.size(), results.size());
 
 
         for ( int i = 0; i < expected.size(); i++ ) {
@@ -535,6 +539,36 @@ public class OrderedMergeTest {
           }
       }
 
+    @Test
+    public void obsIterator() {
+        Iterator<Object> iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
+            int count = 0;
+            while (!subscriber.isUnsubscribed()) {
+                //pull from source
+                for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
+                    //emit
+                    log.info("loop " + count);
+                    subscriber.onNext(count++);
+                }
+            }
+
+            subscriber.onCompleted();
+        })
+            .onBackpressureBlock(1)
+            .doOnNext(o -> {
+                log.info("iteration " + o);
+            }).subscribeOn(Schedulers.io()));
+        //never
+        Object it =iterator.next();
+        it = iterator.next();
+        log.info("iterate");
+        it = iterator.next();
+        log.info("iterate");
+
+        Object size = it;
+    }
+
+
 
     private static class IntegerComparator implements Comparator<Integer> {
 
@@ -552,4 +586,9 @@ public class OrderedMergeTest {
             return Integer.compare( o1, o2 ) * -1;
         }
     }
+
+
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a316e88/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index df625c8..47d3748 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -71,7 +71,7 @@ limitations under the License.
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>1.0.9</rx.version>
+        <rx.version>1.0.12</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.9.37</aws.version>