You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/21 18:20:46 UTC

[1/3] git commit: Added POC for scheduled executor service. Does not work as expected.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/hystrix-integration c1e7a33eb -> 457528f50


Added POC for scheduled executor service.  Does not work as expected.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/167f7a8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/167f7a8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/167f7a8c

Branch: refs/heads/hystrix-integration
Commit: 167f7a8ce9d9f72d76213274479624ac8342bc7b
Parents: 803096e
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 20 10:02:38 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 20 10:02:38 2014 -0700

----------------------------------------------------------------------
 .../rx/CassandraThreadSchedulerTest.java        | 86 ++++++++++++++++++++
 1 file changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/167f7a8c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
index cf399c3..c700e06 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
@@ -1,10 +1,15 @@
 package org.apache.usergrid.persistence.collection.rx;
 
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.jukito.JukitoRunner;
 import org.jukito.UseModules;
@@ -19,6 +24,7 @@ import com.google.inject.Inject;
 
 import rx.Scheduler;
 import rx.functions.Action1;
+import rx.schedulers.Schedulers;
 import rx.util.functions.Action0;
 
 import static org.junit.Assert.assertTrue;
@@ -279,6 +285,82 @@ public class CassandraThreadSchedulerTest {
         assertTrue( "Completed executing actions", completed );
     }
 
+    @Test(expected = RejectedExecutionException.class)
+    public void schedulerPoc() throws InterruptedException {
+
+        final int size = 10;
+
+        //create our thread factory so we can label our threads in case we need to dump them
+        final ThreadFactory factory = new ThreadFactory() {
+
+            private final AtomicLong counter = new AtomicLong();
+
+            @Override
+            public Thread newThread( final Runnable r ) {
+
+               final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
+
+                LOG.debug( "Allocating new IO thread with name {}", threadName );
+
+                Thread t = new Thread( r, threadName );
+                t.setDaemon( true );
+                return t;
+            }
+        };
+
+
+        /**
+         * Create a threadpool that will reclaim unused threads after 60 seconds.
+         * It uses the max thread count set here. It intentionally uses the
+         * DynamicProperty, so that when it is updated, the listener updates the
+         * pool size. Additional allocation is trivial.  Shrinking the size
+         * will require all currently executing threads to run to completion,
+         * without allowing additional tasks to be queued.
+         */
+        final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(10, factory, new ThreadPoolExecutor.AbortPolicy());
+        //set the max thread size
+        pool.setMaximumPoolSize( 10 );
+        pool.setKeepAliveTime( 60, TimeUnit.SECONDS );
+
+        final CountDownLatch latch = new CountDownLatch( size );
+        final Semaphore semaphore = new Semaphore( 0 );
+
+        for(int i = 0; i < size; i ++){
+            pool.schedule( new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+
+                    latch.countDown();
+
+                    //block so this is still running in the scheduled execute
+                    semaphore.acquire();
+
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+            }, 0, TimeUnit.MILLISECONDS );
+
+
+
+
+        }
+
+
+        //wait for all our threads to get to semaphore acquisition and block to ensure we're running at capacity
+        latch.await();
+
+        //now schedule 1 more, we should blow up
+
+        pool.schedule( new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                return null;  //To change body of implemented methods use File | Settings | File Templates.
+            }
+        }, 0, TimeUnit.MILLISECONDS );
+
+
+
+    }
+
 
     /**
      * Schedule actions into the semaphore.
@@ -315,12 +397,16 @@ public class CassandraThreadSchedulerTest {
                     }
                 }
             };
+
             rxScheduler.schedule( action );
         }
 
 
         return latch;
     }
+
+
+
 }
 
 


[3/3] git commit: Removed scheduler to instead use the default RX scheduler.

Posted by sn...@apache.org.
Removed scheduler to instead use the default RX scheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/457528f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/457528f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/457528f5

Branch: refs/heads/hystrix-integration
Commit: 457528f50e26166a2c157897a128ef135d5a2991
Parents: df0091b
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 21 10:15:22 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 21 10:15:22 2014 -0700

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   8 +-
 .../impl/EntityCollectionManagerImpl.java       |  19 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |  14 +-
 .../collection/rx/CassandraThreadScheduler.java | 107 -----
 .../persistence/collection/rx/RxFig.java        |  42 --
 .../stage/write/WriteUniqueVerifyStageTest.java |   3 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 .../rx/CassandraThreadSchedulerTest.java        | 413 -------------------
 .../persistence/collection/rx/ParallelTest.java |   6 +-
 .../graph/consistency/AsyncProcessorImpl.java   |   6 +-
 .../graph/hystrix/HystrixGraphObservable.java   |  52 +++
 .../persistence/graph/impl/EdgeManagerImpl.java |  58 +--
 .../graph/impl/NodeDeleteListener.java          |  23 +-
 .../graph/impl/stage/AbstractEdgeRepair.java    |  11 +-
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |   4 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |  13 +-
 .../graph/impl/stage/EdgeWriteRepairImpl.java   |   4 +-
 .../persistence/graph/EdgeManagerTimeoutIT.java | 125 ++++--
 .../graph/consistency/AsyncProcessorTest.java   |   2 +-
 .../graph/test/util/EdgeTestUtils.java          |  17 +-
 20 files changed, 240 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index e0744c3..8f41a2b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -29,8 +29,6 @@ import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSy
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
-import org.apache.usergrid.persistence.collection.rx.RxFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
@@ -38,8 +36,6 @@ import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 
-import rx.Scheduler;
-
 
 /**
  * Simple module for wiring our collection api
@@ -52,8 +48,7 @@ public class CollectionModule extends AbstractModule {
     @Override
     protected void configure() {
         //noinspection unchecked
-        install( new GuicyFigModule( 
-                RxFig.class, 
+        install( new GuicyFigModule(
                 MigrationManagerFig.class,
                 CassandraFig.class, 
                 SerializationFig.class ) );
@@ -67,7 +62,6 @@ public class CollectionModule extends AbstractModule {
                 .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
                 .build( EntityCollectionManagerFactory.class ) );
 
-        bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
 
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index ef95a75..0a099e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -46,10 +46,10 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -65,7 +65,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
     private final CollectionScope collectionScope;
     private final UUIDService uuidService;
-    private final Scheduler scheduler;
 
 
     //start stages
@@ -87,7 +86,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     public EntityCollectionManagerImpl( 
         final UUIDService uuidService, 
         final WriteStart writeStart,
-        final Scheduler scheduler, 
         final WriteUniqueVerify writeVerifyUnique,
         final WriteOptimisticVerify writeOptimisticVerify,
         final WriteCommit writeCommit, 
@@ -108,7 +106,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         this.markCommit = markCommit;
 
         this.uuidService = uuidService;
-        this.scheduler = scheduler;
         this.collectionScope = collectionScope;
     }
 
@@ -138,7 +135,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
         // fire the stages
-        // TODO use our own scheduler to help with multitenancy here.
+        // TODO use our own Schedulers.io() to help with multitenancy here.
         // TODO writeOptimisticVerify and writeVerifyUnique should be concurrent to reduce wait time
         // these 3 lines could be done in a single line, but they are on multiple lines for clarity
 
@@ -146,7 +143,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 
         Observable<CollectionIoEvent<MvccEntity>> observable =
-            Observable.from( writeData ).subscribeOn( scheduler ).map( writeStart ).flatMap(
+            Observable.from( writeData ).subscribeOn( Schedulers.io() ).map( writeStart ).flatMap(
                 new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
 
                     @Override
@@ -163,13 +160,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
-                            Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
+                            Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
                                 .flatMap( writeVerifyUnique);
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                            Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
+                            Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
                                 .map( writeOptimisticVerify );
 
 
@@ -193,7 +190,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         // execute all validation stages concurrently.  Needs refactored when this is done.  
         // https://github.com/Netflix/RxJava/issues/627
-        // observable = Concurrent.concurrent( observable, scheduler, new WaitZip(), 
+        // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), 
         //                  writeVerifyUnique, writeOptimisticVerify );
 
         //return the commit result.
@@ -209,7 +206,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
         return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
-            .subscribeOn( scheduler ).map( markStart ).map( markCommit );
+            .subscribeOn( Schedulers.io() ).map( markStart ).map( markCommit );
     }
 
 
@@ -221,7 +218,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage");
 
         return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
-            .subscribeOn( scheduler ).map( load );
+            .subscribeOn( Schedulers.io() ).map( load );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index fe53b13..d7733c2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -39,9 +39,9 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Func1;
 import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -54,22 +54,16 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
 
     private final UniqueValueSerializationStrategy uniqueValueStrat;
 
-
-    private final Scheduler scheduler;
-
     protected final SerializationFig serializationFig;
 
 
     @Inject
-    public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
-                              final Scheduler scheduler, final SerializationFig serializationFig ) {
+    public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, final SerializationFig serializationFig ) {
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
-        Preconditions.checkNotNull( scheduler, "scheduler is required" );
         Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
 
         this.uniqueValueStrat = uniqueValueSerializiationStrategy;
-        this.scheduler = scheduler;
         this.serializationFig = serializationFig;
     }
 
@@ -101,7 +95,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
             //if it's unique, create a function to validate it and add it to the list of concurrent validations
             if ( field.isUnique() ) {
 
-                Observable<FieldUniquenessResult> result =  Observable.from( field ).subscribeOn( scheduler ).map(new Func1<Field,  FieldUniquenessResult>() {
+                Observable<FieldUniquenessResult> result =  Observable.from( field ).subscribeOn( Schedulers.io() ).map(new Func1<Field,  FieldUniquenessResult>() {
                     @Override
                     public FieldUniquenessResult call(Field field ) {
 
@@ -139,7 +133,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
 
         //short circuit.  If we zip up nothing, we block forever.
         if(fields.size() == 0){
-            return Observable.from(ioevent ).subscribeOn( scheduler );
+            return Observable.from(ioevent ).subscribeOn( Schedulers.io() );
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
deleted file mode 100644
index b2f2584..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.name.Named;
-
-import rx.Scheduler;
-import rx.schedulers.Schedulers;
-
-
-public class CassandraThreadScheduler implements Provider<Scheduler> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraThreadScheduler.class);
-
-    private final RxFig rxFig;
-
-
-    @Inject
-    public CassandraThreadScheduler( final RxFig rxFig ) {
-        this.rxFig = rxFig;
-    }
-
-
-    @Override
-    @Named( "cassandraScheduler" )
-    public Scheduler get() {
-
-        //create our thread factory so we can label our threads in case we need to dump them
-        final ThreadFactory factory = new ThreadFactory() {
-
-            private final AtomicLong counter = new AtomicLong();
-
-            @Override
-            public Thread newThread( final Runnable r ) {
-
-               final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
-
-                LOG.debug( "Allocating new IO thread with name {}", threadName );
-
-                Thread t = new Thread( r, threadName );
-                t.setDaemon( true );
-                return t;
-            }
-        };
-
-
-        /**
-         * Create a threadpool that will reclaim unused threads after 60 seconds.  
-         * It uses the max thread count set here. It intentionally uses the 
-         * DynamicProperty, so that when it is updated, the listener updates the 
-         * pool size. Additional allocation is trivial.  Shrinking the size 
-         * will require all currently executing threads to run to completion, 
-         * without allowing additional tasks to be queued.
-         */
-        final ThreadPoolExecutor pool = new ThreadPoolExecutor( 
-                0, rxFig.getMaxThreadCount(), 60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>(), factory, new ThreadPoolExecutor.AbortPolicy() );
-
-
-        // if our max thread count is updated, we want to immediately update the pool.  
-        // Per the javadoc if the size is smaller, existing threads will continue to run 
-        // until they become idle and time out
-        rxFig.addPropertyChangeListener( new PropertyChangeListener() {
-            @Override
-            public void propertyChange( final PropertyChangeEvent evt ) {
-                if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount" ) ) ) {
-                    LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old = {}, new = {} ",
-                            evt.getOldValue(), evt.getNewValue() );
-                    pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
-                }
-            }
-        } );
-
-        return Schedulers.executor( pool );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
deleted file mode 100644
index 54a5211..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- * Configuration interface for RxJava classes.
- */
-@FigSingleton
-public interface RxFig extends GuicyFig {
-
-    public static final String PROP_THREAD = "rx.cassandra.io.threads";
-
-    /**
-     * Max number of threads a pool can allocate.  Can be dynamically changed after starting
-     */
-    @Key( PROP_THREAD )
-    @Default( "100" )
-    int getMaxThreadCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
index 78b7d13..863e3ce 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
@@ -40,9 +40,8 @@ public class WriteUniqueVerifyStageTest extends AbstractMvccEntityStageTest {
     @Override
     protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
         UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class );
-        Scheduler scheduler = mock( Scheduler.class );
         SerializationFig fig = mock( SerializationFig.class );
-        new WriteUniqueVerify( uvstrat, scheduler, fig ).call( event );
+        new WriteUniqueVerify( uvstrat, fig ).call( event );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 20dfa91..1a12fef 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -81,8 +81,6 @@ public class WriteUniqueVerifyTest {
     public MigrationManagerRule migrationManagerRule;
 
 
-    @Inject
-    private Scheduler scheduler;
 
     @Inject
     private SerializationFig fig;
@@ -103,7 +101,7 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, scheduler, fig );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
 
         CollectionIoEvent<MvccEntity> result = newStage.call( 
             new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
@@ -134,7 +132,7 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, scheduler, fig );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
 
         CollectionIoEvent<MvccEntity> result = newStage.call( 
             new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
deleted file mode 100644
index c700e06..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ /dev/null
@@ -1,413 +0,0 @@
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-
-import com.google.inject.Inject;
-
-import rx.Scheduler;
-import rx.functions.Action1;
-import rx.schedulers.Schedulers;
-import rx.util.functions.Action0;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-/**
- * Test for our scheduler
- */
-@RunWith( JukitoRunner.class )
-@UseModules( TestCollectionModule.class )
-public class CassandraThreadSchedulerTest {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( CassandraThreadSchedulerTest.class );
-
-    /**
-     * Number of milliseconds to wait when trying to acquire the semaphore
-     */
-    private static final long TEST_TIMEOUT = 30000;
-
-    @Inject
-    private RxFig rxFig;
-
-
-    @Test
-    public void testMaxLimit() throws InterruptedException {
-
-        final int maxCount = 10;
-
-        rxFig.override( RxFig.PROP_THREAD, ""+ maxCount );
-
-        final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler( rxFig );
-
-        final Scheduler rxScheduler = cassSchedulerSetup.get();
-
-        //we want a fair semaphore so we can release in acquire order
-        final Semaphore semaphore = new Semaphore( 0, true );
-
-        //we should not have maxCount actions running in the scheduler
-        CountDownLatch result = schedule( rxScheduler, rxFig.getMaxThreadCount(), semaphore, TEST_TIMEOUT );
-
-        //schedule and we should fail
-
-        try {
-            rxScheduler.schedule( new Action1<Scheduler.Inner>() {
-                            @Override
-                            public void call( final Scheduler.Inner inner ) {
-                            }
-                            });
-
-            fail( "This should have thrown an exception" );
-        }
-        catch ( RejectedExecutionException ree ) {
-            //swallow, we just want to ensure we catch this to continue the test
-        }
-
-        //now release the semaphore so all 10 can run
-        semaphore.release( rxFig.getMaxThreadCount() );
-
-        //wait for completion
-        boolean completed = result.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-
-        //verify we can schedule and execute a new operation
-        result = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT );
-
-        semaphore.release( 1 );
-
-        completed = result.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-    }
-
-
-    /**
-     * Test running from a max limit to a lower limit and fails to schedule new threads
-     */
-    @Test
-    public void testMaxLimitShrink() throws InterruptedException {
-
-        final int maxCount = 10;
-
-        final int half = maxCount / 2;
-
-        //kind of a hack, but necessary with the way properties are singletons.  Otherwise you get side effects from
-        // other tests
-        rxFig.override( RxFig.PROP_THREAD, "" + maxCount );
-
-        final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler( rxFig );
-
-        final Scheduler rxScheduler = cassSchedulerSetup.get();
-
-        //we want a fair semaphore so we can release in acquire order
-        final Semaphore semaphore = new Semaphore( 0, true );
-
-        //we should not have maxCount actions running in the scheduler
-        CountDownLatch firstHalf = schedule( rxScheduler, half, semaphore, TEST_TIMEOUT );
-
-        //schedule the second half
-        CountDownLatch secondHalf = schedule( rxScheduler, half, semaphore, TEST_TIMEOUT );
-
-        //schedule and we should fail
-
-        try {
-
-            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
-                            @Override
-                            public void call( final Scheduler.Inner inner ) {
-                    //no op
-                }
-            } );
-
-            fail( "This should have thrown an exception" );
-        }
-        catch ( RejectedExecutionException ree ) {
-            //swallow, we just want to ensure we catch this to continue the test
-        }
-
-
-        //update the property to shrink the size
-        rxFig.override( RxFig.PROP_THREAD, "" + half );
-
-        //now release the first half of executors
-        semaphore.release( half );
-
-
-        //wait for completion
-        boolean completed = firstHalf.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-
-        //verify we can't schedule b/c we're still at capacity
-
-
-        try {
-
-            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
-                            @Override
-                            public void call( final Scheduler.Inner inner ) {
-                    //no op
-                }
-            } );
-
-            fail( "This should have thrown an exception.  We still don't have capacity for new threads" );
-        }
-        catch ( RejectedExecutionException ree ) {
-            //swallow, we just want to ensure we catch this to continue the test
-        }
-
-
-        //now release the rest of the semaphores
-        semaphore.release( maxCount - half );
-
-        completed = secondHalf.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-
-        //verify we can schedule and execute a new operation
-        CountDownLatch newJob = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT  );
-
-        semaphore.release( 1 );
-
-        completed = newJob.await( 20, TimeUnit.SECONDS );
-        assertTrue( "Completed executing actions", completed );
-    }
-
-
-    /**
-     * Test that when we're fully blocked, if we expand we have capacity
-     */
-    @Test
-    public void testExpandLimit() throws InterruptedException {
-
-        final int startCount = 10;
-
-        //kind of a hack, but necessary with the way properties are singletons.  Otherwise you get side effects from
-        // other tests
-        rxFig.override( RxFig.PROP_THREAD, "" + startCount );
-
-        final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler( rxFig );
-
-        final Scheduler rxScheduler = cassSchedulerSetup.get();
-
-        //we want a fair semaphore so we can release in acquire order
-        final Semaphore semaphore = new Semaphore( 0, true );
-
-        //we should not have maxCount actions running in the scheduler
-        CountDownLatch firstBatch = schedule( rxScheduler, rxFig.getMaxThreadCount(), semaphore, TEST_TIMEOUT  );
-
-        //schedule and we should fail
-
-        try {
-
-            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
-                            @Override
-                            public void call( final Scheduler.Inner inner ) {
-                    //no op
-                }
-            } );
-
-            fail( "This should have thrown an exception" );
-        }
-        catch ( RejectedExecutionException ree ) {
-            //swallow, we just want to ensure we catch this to continue the test
-        }
-
-
-        //now allocate more capacity
-        final int doubleMaxCount = startCount * 2;
-
-        //update the property to shrink the size
-        rxFig.override( RxFig.PROP_THREAD, "" + doubleMaxCount );
-
-
-        //now schedule 10 more
-
-        CountDownLatch secondBatch = schedule( rxScheduler, rxFig.getMaxThreadCount() - startCount, semaphore, TEST_TIMEOUT  );
-
-        //this should fail.  We're at capacity
-
-        try {
-
-            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
-                            @Override
-                            public void call( final Scheduler.Inner inner ) {
-                    //no op
-                }
-            } );
-
-            fail( "This should have thrown an exception" );
-        }
-        catch ( RejectedExecutionException ree ) {
-            //swallow, we just want to ensure we catch this to continue the test
-        }
-
-
-        //now release the semaphores so all
-        semaphore.release( rxFig.getMaxThreadCount() );
-
-        //wait for completion
-        boolean completed = firstBatch.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-
-        completed = secondBatch.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-
-        //verify we can schedule and execute a new operation
-        CountDownLatch result = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT  );
-
-        semaphore.release( 1 );
-
-        completed = result.await( 20, TimeUnit.SECONDS );
-
-        assertTrue( "Completed executing actions", completed );
-    }
-
-    @Test(expected = RejectedExecutionException.class)
-    public void schedulerPoc() throws InterruptedException {
-
-        final int size = 10;
-
-        //create our thread factory so we can label our threads in case we need to dump them
-        final ThreadFactory factory = new ThreadFactory() {
-
-            private final AtomicLong counter = new AtomicLong();
-
-            @Override
-            public Thread newThread( final Runnable r ) {
-
-               final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
-
-                LOG.debug( "Allocating new IO thread with name {}", threadName );
-
-                Thread t = new Thread( r, threadName );
-                t.setDaemon( true );
-                return t;
-            }
-        };
-
-
-        /**
-         * Create a threadpool that will reclaim unused threads after 60 seconds.
-         * It uses the max thread count set here. It intentionally uses the
-         * DynamicProperty, so that when it is updated, the listener updates the
-         * pool size. Additional allocation is trivial.  Shrinking the size
-         * will require all currently executing threads to run to completion,
-         * without allowing additional tasks to be queued.
-         */
-        final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(10, factory, new ThreadPoolExecutor.AbortPolicy());
-        //set the max thread size
-        pool.setMaximumPoolSize( 10 );
-        pool.setKeepAliveTime( 60, TimeUnit.SECONDS );
-
-        final CountDownLatch latch = new CountDownLatch( size );
-        final Semaphore semaphore = new Semaphore( 0 );
-
-        for(int i = 0; i < size; i ++){
-            pool.schedule( new Callable<Object>() {
-                @Override
-                public Object call() throws Exception {
-
-                    latch.countDown();
-
-                    //block so this is still running in the scheduled execute
-                    semaphore.acquire();
-
-                    return null;  //To change body of implemented methods use File | Settings | File Templates.
-                }
-            }, 0, TimeUnit.MILLISECONDS );
-
-
-
-
-        }
-
-
-        //wait for all our threads to get to semaphore acquisition and block to ensure we're running at capacity
-        latch.await();
-
-        //now schedule 1 more, we should blow up
-
-        pool.schedule( new Callable<Object>() {
-            @Override
-            public Object call() throws Exception {
-                return null;  //To change body of implemented methods use File | Settings | File Templates.
-            }
-        }, 0, TimeUnit.MILLISECONDS );
-
-
-
-    }
-
-
-    /**
-     * Schedule actions into the semaphore.
-     *
-     * @param rxScheduler the Scheduler to use
-     * @param totalCount The total count of actions to invoke
-     * @param semaphore The semaphore to take on acquire
-     *
-     * @return The latch to block on.  When all jobs have been executed this will be tripped
-     */
-    private CountDownLatch schedule( Scheduler rxScheduler, final int totalCount, final Semaphore semaphore, final long timeout ) {
-
-        final CountDownLatch latch = new CountDownLatch( totalCount );
-
-        for ( int i = 0; i < totalCount; i++ ) {
-
-            final Action1<Scheduler.Inner> action = new  Action1<Scheduler.Inner>() {
-                                       @Override
-                                       public void call( final Scheduler.Inner inner ) {
-                    try {
-                        final String threadName = Thread.currentThread().getName();
-
-                        LOG.info( "{} trying to acquire semaphore", threadName );
-                        //get and release the lock
-                        semaphore.tryAcquire( timeout, TimeUnit.MILLISECONDS );
-
-                        LOG.info( "{} has acquired sempahore", threadName );
-
-                        //countdown the latch
-                        latch.countDown();
-                    }
-                    catch ( InterruptedException e ) {
-                        throw new RuntimeException( e );
-                    }
-                }
-            };
-
-            rxScheduler.schedule( action );
-        }
-
-
-        return latch;
-    }
-
-
-
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index 5d72ae6..471fe1f 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -79,7 +79,7 @@ public class ParallelTest {
         //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
 
         //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
-        final Scheduler scheduler = Schedulers.io();
+
 
         //set our size equal
 //        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
@@ -102,7 +102,7 @@ public class ParallelTest {
          *  non blocking?
          */
 
-        final Observable<String> observable = Observable.from( input ).observeOn( scheduler );
+        final Observable<String> observable = Observable.from( input ).observeOn( Schedulers.io() );
 
 
         Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() {
@@ -124,7 +124,7 @@ public class ParallelTest {
                     /**
                      * QUESTION: Should this again be the process thread, not the I/O
                      */
-                    Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler );
+                    Observable<String> newObservable = Observable.from( input ).subscribeOn( Schedulers.io() );
 
                     Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index b949d55..2b00d1a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -36,7 +36,6 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
     private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
 
     protected final TimeoutQueue<T> queue;
-    protected final Scheduler scheduler;
     protected final GraphFig graphFig;
     protected final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
 
@@ -46,9 +45,8 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
 
     @Inject
-    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler, final GraphFig graphFig ) {
+    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final GraphFig graphFig ) {
         this.queue = queue;
-        this.scheduler = scheduler;
         this.graphFig = graphFig;
 
         //we purposefully use a new thread.  We don't want to use one of the I/O threads to run this task
@@ -72,7 +70,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
         List<Observable<?>> observables = new ArrayList<Observable<?>>( listeners.size() );
 
         for ( MessageListener<T, T> listener : listeners ) {
-            observables.add( listener.receive( data ).subscribeOn( scheduler ) );
+            observables.add( listener.receive( data ).subscribeOn( Schedulers.io() ) );
         }
 
         //run everything in parallel and zip it up

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
new file mode 100644
index 0000000..8a44389
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.hystrix;
+
+
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixObservableCommand;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+/**
+ *
+ *
+ */
+public class HystrixGraphObservable {
+
+    /**
+     * Wrap the observable in the timeout
+     * @param observable
+     * @param <T>
+     * @return
+     */
+    public static <T> Observable<T> wrap(final Observable<T> observable){
+            return new HystrixObservableCommand<T>( HystrixCommandGroupKey.Factory.asKey( "Graph" ) ){
+
+                @Override
+                protected Observable<T> run() {
+                    return observable;
+                }
+            }.toObservable( Schedulers.io() );
+        }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index f825767..4e5746a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -57,6 +57,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import rx.Observable;
 import rx.Scheduler;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -68,8 +69,6 @@ public class EdgeManagerImpl implements EdgeManager {
 
     private final OrganizationScope scope;
 
-    private final Scheduler scheduler;
-
     private final EdgeMetadataSerialization edgeMetadataSerialization;
 
 
@@ -85,7 +84,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
 
     @Inject
-    public EdgeManagerImpl( final Scheduler scheduler, final EdgeMetadataSerialization edgeMetadataSerialization,
+    public EdgeManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
                             final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
                             final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
                             @EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
@@ -95,7 +94,6 @@ public class EdgeManagerImpl implements EdgeManager {
 
 
         this.scope = scope;
-        this.scheduler = scheduler;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeSerialization = edgeSerialization;
         this.nodeSerialization = nodeSerialization;
@@ -116,7 +114,7 @@ public class EdgeManagerImpl implements EdgeManager {
     public Observable<Edge> writeEdge( final Edge edge ) {
         EdgeUtils.validateEdge( edge );
 
-        return Observable.from( edge ).subscribeOn( scheduler ).map( new Func1<Edge, Edge>() {
+        return Observable.from( edge ).subscribeOn(  Schedulers.io() ).map( new Func1<Edge, Edge>() {
             @Override
             public Edge call( final Edge edge ) {
                 final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
@@ -146,7 +144,7 @@ public class EdgeManagerImpl implements EdgeManager {
     public Observable<Edge> deleteEdge( final Edge edge ) {
         EdgeUtils.validateEdge( edge );
 
-        return Observable.from( edge ).subscribeOn( scheduler ).map( new Func1<Edge, Edge>() {
+        return Observable.from( edge ).subscribeOn(  Schedulers.io() ).map( new Func1<Edge, Edge>() {
             @Override
             public Edge call( final Edge edge ) {
                 final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
@@ -172,7 +170,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Id> deleteNode( final Id node ) {
-        return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Id>() {
+        return Observable.from( node ).subscribeOn(  Schedulers.io() ).map( new Func1<Id, Id>() {
             @Override
             public Id call( final Id id ) {
 
@@ -201,33 +199,37 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeSerialization.getEdgesFromSource( scope, search );
-            }
-        } )//we intentionally use distinct until changed.  This way we won't store all the keys since this
-                //would hog far too much ram.
-                .distinctUntilChanged( new Func1<Edge, Id>() {
+
+
+        return
+
+                Observable.create( new ObservableIterator<MarkedEdge>() {
                     @Override
-                    public Id call( final Edge edge ) {
-                        return edge.getTargetNode();
+                    protected Iterator<MarkedEdge> getIterator() {
+                        return edgeSerialization.getEdgesFromSource( scope, search );
                     }
-                } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
-                .cast( Edge.class );
+                } )//we intentionally use distinct until changed.  This way we won't store all the keys since this
+                        //would hog far too much memory.
+                        .distinctUntilChanged( new Func1<Edge, Id>() {
+                            @Override
+                            public Id call( final Edge edge ) {
+                                return edge.getTargetNode();
+                            }
+                        } ).buffer( graphFig.getScanPageSize() )
+                        .flatMap( new EdgeBufferFilter( search.getMaxVersion() ) ).cast( Edge.class );
     }
 
 
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
             }
         } )
                 //we intentionally use distinct until changed.  This way we won't store all the keys since this
-                //would hog far too much ram.
+                //would hog far too much memory.
                 .distinctUntilChanged( new Func1<Edge, Id>() {
                     @Override
                     public Id call( final Edge edge ) {
@@ -240,7 +242,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -258,7 +260,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
             @Override
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
@@ -276,7 +278,7 @@ public class EdgeManagerImpl implements EdgeManager {
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( ) {
+        return Observable.create( new ObservableIterator<String>() {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -287,7 +289,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>( ) {
+        return Observable.create( new ObservableIterator<String>() {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
@@ -299,7 +301,7 @@ public class EdgeManagerImpl implements EdgeManager {
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
 
-        return Observable.create( new ObservableIterator<String>( ) {
+        return Observable.create( new ObservableIterator<String>() {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -310,7 +312,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>( ) {
+        return Observable.create( new ObservableIterator<String>() {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
@@ -351,7 +353,7 @@ public class EdgeManagerImpl implements EdgeManager {
         public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
 
             final Map<Id, UUID> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
-            return Observable.from( markedEdges ).subscribeOn( scheduler )
+            return Observable.from( markedEdges ).subscribeOn(  Schedulers.io() )
                              .filter( new EdgeFilter( this.maxVersion, markedVersions ) );
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 0927ba8..190bbff 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -32,6 +32,7 @@ import rx.Observable;
 import rx.Scheduler;
 import rx.functions.Action0;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -47,29 +48,23 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
     private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final EdgeDeleteRepair edgeDeleteRepair;
     private final EdgeMetaRepair edgeMetaRepair;
-    private final GraphFig graphFig;
-
-    private final Scheduler scheduler;
 
 
     /**
      * Wire the serialization dependencies
      */
     @Inject
-    public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
-                               final Scheduler scheduler, @NodeDelete final AsyncProcessor nodeDelete,
+    public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization, @NodeDelete final AsyncProcessor nodeDelete,
                                final EdgeMetadataSerialization edgeMetadataSerialization,
-                               final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
-                               final GraphFig graphFig ) {
+                               final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair
+                                ) {
 
 
         this.nodeSerialization = nodeSerialization;
         this.edgeSerialization = edgeSerialization;
-        this.scheduler = scheduler;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeDeleteRepair = edgeDeleteRepair;
         this.edgeMetaRepair = edgeMetaRepair;
-        this.graphFig = graphFig;
 
         nodeDelete.addListener( this );
     }
@@ -88,7 +83,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
         final UUID version = edgeEvent.getVersion();
 
 
-        return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Optional<UUID>>() {
+        return Observable.from( node ).subscribeOn( Schedulers.io() ).map( new Func1<Id, Optional<UUID>>() {
             @Override
             public Optional<UUID> call( final Id id ) {
                 return nodeSerialization.getMaxVersion( scope, node );
@@ -215,7 +210,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
-        } ).subscribeOn( scheduler );
+        } ).subscribeOn( Schedulers.io() );
     }
 
 
@@ -229,7 +224,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
-        } ).subscribeOn( scheduler );
+        } ).subscribeOn( Schedulers.io() );
     }
 
 
@@ -243,7 +238,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
             }
-        } ).subscribeOn( scheduler );
+        } ).subscribeOn( Schedulers.io() );
     }
 
 
@@ -257,7 +252,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
             }
-        } ).subscribeOn( scheduler );
+        } ).subscribeOn( Schedulers.io() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index fb31a33..3ff7178 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -44,6 +44,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import rx.Observable;
 import rx.Scheduler;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -59,16 +60,14 @@ public abstract class AbstractEdgeRepair  {
     protected final EdgeSerialization edgeSerialization;
     protected final GraphFig graphFig;
     protected final Keyspace keyspace;
-    protected final Scheduler scheduler;
 
 
     @Inject
     public AbstractEdgeRepair( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
-                               final Keyspace keyspace, final Scheduler scheduler ) {
+                               final Keyspace keyspace) {
         this.edgeSerialization = edgeSerialization;
         this.graphFig = graphFig;
         this.keyspace = keyspace;
-        this.scheduler = scheduler;
     }
 
 
@@ -106,7 +105,7 @@ public abstract class AbstractEdgeRepair  {
                                      throw new RuntimeException( "Unable to issue write to cassandra", e );
                                  }
 
-                                 return Observable.from( markedEdges ).subscribeOn( scheduler );
+                                 return Observable.from( markedEdges ).subscribeOn( Schedulers.io() );
                              }
              } );
     }
@@ -133,7 +132,7 @@ public abstract class AbstractEdgeRepair  {
 
                 return edgeSerialization.getEdgeFromSource( scope, search );
             }
-        } ).subscribeOn( scheduler );
+        } ).subscribeOn( Schedulers.io() );
     }
 
 
@@ -150,7 +149,7 @@ public abstract class AbstractEdgeRepair  {
 
                 return edgeSerialization.getEdgeToTarget( scope, search );
             }
-        } ).subscribeOn( scheduler );
+        } ).subscribeOn( Schedulers.io() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index bf5d3d2..e900a1d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -47,8 +47,8 @@ public class EdgeDeleteRepairImpl extends AbstractEdgeRepair implements EdgeDele
 
     @Inject
     public EdgeDeleteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
-                                 final Keyspace keyspace, final Scheduler scheduler ) {
-        super( edgeSerialization, graphFig, keyspace, scheduler );
+                                 final Keyspace keyspace) {
+        super( edgeSerialization, graphFig, keyspace );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 0bbc91a..8b81d0a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -51,6 +51,7 @@ import rx.Scheduler;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.observables.MathObservable;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -65,18 +66,16 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
     private final EdgeSerialization edgeSerialization;
     private final Keyspace keyspace;
     private final GraphFig graphFig;
-    private final Scheduler scheduler;
 
 
     @Inject
     public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
                                final EdgeSerialization edgeSerialization, final Keyspace keyspace,
-                               final GraphFig graphFig, final Scheduler scheduler ) {
+                               final GraphFig graphFig ) {
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeSerialization = edgeSerialization;
         this.keyspace = keyspace;
         this.graphFig = graphFig;
-        this.scheduler = scheduler;
     }
 
 
@@ -257,7 +256,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                     return edgeMetadataSerialization
                             .getIdTypesToTarget( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
                 }
-            } ).subscribeOn( scheduler );
+            } ).subscribeOn( Schedulers.io() );
         }
 
 
@@ -270,7 +269,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                     return edgeSerialization.getEdgesToTargetBySourceType( scope,
                             new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
                 }
-            } ).subscribeOn( scheduler );
+            } ).subscribeOn( Schedulers.io() );
         }
 
 
@@ -302,7 +301,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                     return edgeMetadataSerialization
                             .getIdTypesFromSource( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
                 }
-            } ).subscribeOn( scheduler );
+            } ).subscribeOn( Schedulers.io() );
         }
 
 
@@ -315,7 +314,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                     return edgeSerialization.getEdgesFromSourceByTargetType( scope,
                             new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
                 }
-            } ).subscribeOn( scheduler );
+            } ).subscribeOn( Schedulers.io() );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
index e8420a8..f77ef95 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -47,8 +47,8 @@ public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWrite
 
     @Inject
     public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
-                                final Keyspace keyspace, final Scheduler scheduler ) {
-        super( edgeSerialization, graphFig, keyspace, scheduler );
+                                final Keyspace keyspace) {
+        super( edgeSerialization, graphFig, keyspace );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
index e4591cd..6ee4183 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
@@ -20,10 +20,13 @@
 package org.apache.usergrid.persistence.graph;
 
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jukito.All;
 import org.jukito.JukitoRunner;
@@ -33,7 +36,8 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.safehaus.guicyfig.GuicyFig;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -41,14 +45,16 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
+import com.netflix.hystrix.exception.HystrixRuntimeException;
 
 import rx.Observable;
-import rx.functions.Action1;
+import rx.Subscriber;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
@@ -57,13 +63,14 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
 //@UseModules( { TestGraphModule.class, EdgeManagerIT.InvalidInput.class } )
 public class EdgeManagerTimeoutIT {
 
@@ -101,50 +108,117 @@ public class EdgeManagerTimeoutIT {
 
         when( scope.getOrganization() ).thenReturn( orgId );
 
-        if(graphFig.getReadTimeout() > TIMEOUT){
-            fail("Graph read timeout must be <= " + TIMEOUT + ".  Otherwise tests are invalid");
+        if ( graphFig.getReadTimeout() > TIMEOUT ) {
+            fail( "Graph read timeout must be <= " + TIMEOUT + ".  Otherwise tests are invalid" );
         }
     }
 
 
-    @Test(timeout = TIMEOUT, expected = TimeoutException.class)
-    public void testWriteReadEdgeTypeSource() throws InterruptedException {
+    //    @Test(timeout = TIMEOUT, expected = TimeoutException.class)
+    @Test
+    public void testWriteReadEdgeTypeSource( EdgeSerialization serialization ) throws InterruptedException {
 
-        EdgeManager em = emf.createEdgeManager( scope );
 
+        final EdgeManager em = emf.createEdgeManager( scope );
 
-        Edge edge = createEdge( "source", "test", "target" );
 
-        em.writeEdge( edge ).toBlockingObservable().last();
+        final MarkedEdge edge = createEdge( "source", "edge", "target" );
 
         //now test retrieving it
 
         SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getVersion(), null );
 
+
+        final MockingIterator<MarkedEdge> itr = new MockingIterator<>( Collections.singletonList( edge ) );
+
+
+        when( serialization.getEdgesFromSource( scope, search ) ).thenReturn( itr );
+
         Observable<Edge> edges = em.loadEdgesFromSource( search );
 
         //retrieve the edge, ensure that if we block indefinitely, it times out
 
-        final Semaphore blocker = new Semaphore(0);
+        final AtomicInteger onNextCounter = new AtomicInteger();
+        final CountDownLatch errorLatch = new CountDownLatch( 1 );
+
+        final Throwable[] thrown = new Throwable[1];
 
-        edges.subscribe( new Action1<Edge>() {
+
+
+        edges.subscribe( new Subscriber<Edge>() {
             @Override
-            public void call( final Edge edge ) {
-                //block indefinitely, we want to ensure we timeout
-                try {
-                    blocker.acquire();
-                }
-                catch ( InterruptedException e ) {
-                    throw new RuntimeException(e);
+            public void onCompleted() {
+
+            }
+
+
+            @Override
+            public void onError( final Throwable e ) {
+                thrown[0] = e;
+                errorLatch.countDown();
+            }
+
+
+            @Override
+            public void onNext( final Edge edge ) {
+                {
+                    onNextCounter.incrementAndGet();
                 }
             }
         } );
 
-        blocker.acquire();
+
+        errorLatch.await();
+
+
+        assertEquals( "One lement was produced", 1,onNextCounter.intValue() );
+        assertTrue(thrown[0] instanceof HystrixRuntimeException);
 
     }
 
 
+    private class MockingIterator<T> implements Iterator<T> {
+
+        private final Iterator<T> items;
+
+        private final Semaphore semaphore = new Semaphore( 0 );
+
+
+        private MockingIterator( final Collection<T> items ) {
+            this.items = items.iterator();
+        }
+
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+
+        @Override
+        public T next() {
+            if ( items.hasNext() ) {
+                return items.next();
+            }
+
+            //block indefinitely
+            try {
+                semaphore.acquire();
+            }
+            catch ( InterruptedException e ) {
+                throw new RuntimeException( e );
+            }
+
+            return null;
+        }
+
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException( "Cannot remove" );
+        }
+    }
+
 
     @Test
     public void testWriteReadEdgeTypeTarget() {
@@ -1336,8 +1410,6 @@ public class EdgeManagerTimeoutIT {
     }
 
 
-
-
     @Test
     public void markTargetNode() {
 
@@ -1420,8 +1492,7 @@ public class EdgeManagerTimeoutIT {
     }
 
 
-
-    @Test(expected = NullPointerException.class)
+    @Test( expected = NullPointerException.class )
     public void invalidEdgeTypesWrite( @All Edge edge ) {
         final EdgeManager em = emf.createEdgeManager( scope );
 
@@ -1429,7 +1500,7 @@ public class EdgeManagerTimeoutIT {
     }
 
 
-    @Test(expected = NullPointerException.class)
+    @Test( expected = NullPointerException.class )
     public void invalidEdgeTypesDelete( @All Edge edge ) {
         final EdgeManager em = emf.createEdgeManager( scope );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index 3652757..eaade41 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -280,7 +280,7 @@ public class AsyncProcessorTest {
 
         when(fig.getScanPageSize()).thenReturn( 0 );
 
-        AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.io(), fig );
+        AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue,  fig );
 
 
         return processor;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index e9f05e0..68a67d2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.test.util;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
@@ -53,7 +54,7 @@ public class EdgeTestUtils {
      *
      * @return an Edge for testing
      */
-    public static Edge createEdge( final String sourceType, final String edgeType, final String targetType ) {
+    public static MarkedEdge createEdge( final String sourceType, final String edgeType, final String targetType ) {
         return createEdge( createId( sourceType ), edgeType, createId( targetType ), UUIDGenerator.newTimeUUID() );
     }
 
@@ -61,7 +62,7 @@ public class EdgeTestUtils {
     /**
      * Create an edge for testing
      */
-    public static Edge createEdge( final Id sourceId, final String edgeType, final Id targetId ) {
+    public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId ) {
         return createEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID() );
     }
 
@@ -69,12 +70,22 @@ public class EdgeTestUtils {
     /**
      * Create an edge with the specified params
      */
-    public static Edge createEdge( final Id sourceId, final String edgeType, final Id targetId, final UUID version ) {
+    public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId,
+                                         final UUID version ) {
         return new SimpleMarkedEdge( sourceId, edgeType, targetId, version, false );
     }
 
 
     /**
+     * Create an edge with the specified params
+     */
+    public static MarkedEdge createEdge( final String sourceType, final String edgeType, final String targetType,
+                                         final UUID version ) {
+        return new SimpleMarkedEdge( createId( sourceType ), edgeType, createId( targetType ), version, false );
+    }
+
+
+    /**
      * Create the id
      */
     public static Id createId( String type ) {


[2/3] git commit: Merge branch 'asyncqueue' into hystrix-integration

Posted by sn...@apache.org.
Merge branch 'asyncqueue' into hystrix-integration


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/df0091b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/df0091b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/df0091b4

Branch: refs/heads/hystrix-integration
Commit: df0091b4858cf583bcd2b3b5fffab75577723c90
Parents: c1e7a33 167f7a8
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 20 10:03:06 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 20 10:03:06 2014 -0700

----------------------------------------------------------------------
 .../rx/CassandraThreadSchedulerTest.java        | 86 ++++++++++++++++++++
 1 file changed, 86 insertions(+)
----------------------------------------------------------------------