You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/30 20:03:23 UTC

[1/5] incubator-usergrid git commit: Fixes issue with throwable not being caught in onSubscribe function

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-509 abbdd6b48 -> fbce62df8


Fixes issue with throwable not being caught in onSubscribe function


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d1c8b8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d1c8b8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d1c8b8a

Branch: refs/heads/USERGRID-509
Commit: 2d1c8b8ac7b20b63a11d83adca56839d8b409cca
Parents: beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:47:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:47:58 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java   | 39 +++++++-------------
 1 file changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d1c8b8a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 7e64de3..d064b97 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -153,12 +153,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         //name our thread so it's easy to see
                         Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
-                        List<IndexOperationMessage> drainList;
+
+                        List<IndexOperationMessage> drainList = null;
+
                         do {
-                            try {
 
+                            Timer.Context timer = produceTimer.time();
+
+
+                            try {
 
-                                Timer.Context timer = produceTimer.time();
 
                                 drainList = bufferQueue
                                     .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
@@ -174,10 +178,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                 timer.stop();
                             }
 
-                            catch ( Exception e ) {
+                            catch ( Throwable t ) {
                                 final long sleepTime = config.getFailureRetryTime();
 
-                                log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, e );
+                                log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
+
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() );
+                                }
+
 
                                 try {
                                     Thread.sleep( sleepTime );
@@ -216,26 +225,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         inFlight.addAndGet( -1 * indexOperationMessages.size() );
                     }
                 } )
-                //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die
-                .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() {
-                    @Override
-                    public List<IndexOperationMessage> call( final Throwable throwable ) {
-                        final long sleepTime = config.getFailureRetryTime();
-
-                        log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, throwable );
-
-                        try {
-                            Thread.sleep( sleepTime );
-                        }
-                        catch ( InterruptedException ie ) {
-                            //swallow
-                        }
-
-                        indexErrorCounter.inc();
-
-                        return Collections.EMPTY_LIST;
-                    }
-                } )
 
                 .subscribeOn( Schedulers.newThread() );
 


[3/5] incubator-usergrid git commit: Merged from 2.0

Posted by to...@apache.org.
Merged from 2.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7975e4f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7975e4f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7975e4f3

Branch: refs/heads/USERGRID-509
Commit: 7975e4f3b204253a52a81c9e398f0e9e9ffb79c2
Parents: 07c5de0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 27 09:47:29 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 27 09:47:29 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/impl/EsIndexBufferConsumerImpl.java        | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7975e4f3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index d064b97..6d5a4d8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -177,7 +177,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
                                 timer.stop();
                             }
-
+                            //DO NOT add any doOnError* functions to this subscription.  We want the producer
+                            //to receive these exceptions and sleep before a retry
                             catch ( Throwable t ) {
                                 final long sleepTime = config.getFailureRetryTime();
 
@@ -187,7 +188,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                     inFlight.addAndGet( -1 * drainList.size() );
                                 }
 
-
                                 try {
                                     Thread.sleep( sleepTime );
                                 }


[2/5] incubator-usergrid git commit: Merge branch 'two-dot-o' into USERGRID-509

Posted by to...@apache.org.
Merge branch 'two-dot-o' into USERGRID-509


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/07c5de06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/07c5de06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/07c5de06

Branch: refs/heads/USERGRID-509
Commit: 07c5de066f44cef1228e830bf162b128f353c456
Parents: abbdd6b 2d1c8b8
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 27 09:46:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 27 09:46:27 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java   | 39 +++++++-------------
 1 file changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------



[4/5] incubator-usergrid git commit: Fixes hystrix thread pool size issue

Posted by to...@apache.org.
Fixes hystrix thread pool size issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf3f7abe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf3f7abe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf3f7abe

Branch: refs/heads/USERGRID-509
Commit: cf3f7abee29d4bee17313d00fe3abf863260e685
Parents: 7975e4f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 27 16:08:32 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 27 16:08:32 2015 -0600

----------------------------------------------------------------------
 .../collection/mvcc/stage/write/WriteUniqueVerify.java   |  4 ++--
 .../impl/shard/count/NodeShardApproximationImpl.java     | 11 ++++++++++-
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf3f7abe/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index b984ad8..564d036 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -218,6 +218,6 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
      */
     public static final HystrixCommand.Setter
         REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
-            HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
-                HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
+            HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ).andThreadPoolPropertiesDefaults(
+                HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf3f7abe/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index a47d528..fceb32c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
 
 import rx.functions.Action0;
 import rx.schedulers.Schedulers;
@@ -75,6 +76,14 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
 
     private final FlushWorker worker;
 
+    /**
+        * Command group used for realtime user commands
+        */
+       public static final HystrixCommand.Setter
+           COUNT_GROUP = HystrixCommand.Setter.withGroupKey(
+               HystrixCommandGroupKey.Factory.asKey( "BatchCounterRollup" ) ).andThreadPoolPropertiesDefaults(
+                   HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
+
 
     /**
      * Create a time shard approximation with the correct configuration.
@@ -229,7 +238,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
                 /**
                  * Execute the command in hystrix to avoid slamming cassandra
                  */
-                new HystrixCommand( HystrixCommandGroupKey.Factory.asKey("BatchCounterRollup") ) {
+                new HystrixCommand( COUNT_GROUP ) {
 
                     @Override
                     protected Void run() throws Exception {


[5/5] incubator-usergrid git commit: Refactored index scope generation to be more consistent and clean

Posted by to...@apache.org.
Refactored index scope generation to be more consistent and clean

Moved some newly private methods to test utils

Added onStart event to the observable iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fbce62df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fbce62df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fbce62df

Branch: refs/heads/USERGRID-509
Commit: fbce62df84323ea19dce8a00fb6e52aa245e7bb5
Parents: cf3f7ab
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 30 09:45:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 30 12:03:10 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  29 +---
 .../corepersistence/CpRelationManager.java      |  61 +++------
 .../usergrid/corepersistence/CpWalker.java      |   4 +-
 .../events/EntityDeletedHandler.java            |  31 ++---
 .../events/EntityVersionCreatedHandler.java     |   2 +-
 .../events/EntityVersionDeletedHandler.java     |  87 +++++++++---
 .../results/FilteringLoader.java                |   3 +-
 .../corepersistence/util/CpNamingUtils.java     | 134 ++++++++++++++++---
 .../corepersistence/StaleIndexCleanupTest.java  |  10 +-
 .../rx/EdgesFromSourceObservableIT.java         |   9 +-
 .../rx/EdgesToTargetObservableIT.java           |  53 ++++----
 .../apache/usergrid/utils/EdgeTestUtils.java    |  50 +++++++
 .../persistence/core/rx/ObservableIterator.java |   2 +
 13 files changed, 307 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index c45c390..a7dda13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -69,7 +69,6 @@ import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.entities.Event;
 import org.apache.usergrid.persistence.entities.Group;
@@ -80,10 +79,8 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
 
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -129,7 +125,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
-
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-
-            throw hre;
-        }
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity );
@@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager {
     @Override
     public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
 
-        IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
-                getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-
-
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
         //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
 
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-        }
 
         // Index CP entity into default collection scope
         //        IndexScope defaultIndexScope = new IndexScopeImpl(
@@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager {
         final EntityIndexBatch batch = aie.createBatch();
 
         // index member into entity collection | type scope
-        IndexScope collectionIndexScope = new IndexScopeImpl( collectionEntity.getId(),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
-
+        IndexScope collectionIndexScope = generateScopeFromCollection( collectionEntity.getId(), collName );
         batch.index( collectionIndexScope, memberEntity );
 
         //TODO REMOVE INDEX CODE

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3427684..b76f38f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -79,7 +79,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -118,6 +117,11 @@ import rx.functions.Func1;
 import static java.util.Arrays.asList;
 
 import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager {
                 final EntityRef eref =
                     new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                String name;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
-                    name = CpNamingUtils.getConnectionType( edge.getType() );
-                }
-                else {
-                    name = CpNamingUtils.getCollectionName( edge.getType() );
-                }
+                String name = getNameFromEdgeType(edge.getType());
                 addMapSet( entityRefSetMap, eref, name );
             }
          ).toBlocking().last();
@@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager {
                     @Override
                     public void call( final Edge edge ) {
 
-                        EntityRef sourceEntity =
-                                new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
                         // reindex the entity in the source entity's collection or connection index
 
-                        IndexScope indexScope;
-                        if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
-
-                            String collName = CpNamingUtils.getCollectionName( edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()),
-                                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
-                        }
-                        else {
+                        IndexScope indexScope = generateScopeFromSource(edge);
 
-                            String connName = CpNamingUtils.getConnectionType( edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
-                                CpNamingUtils.getConnectionScopeName( connName ) );
-                        }
 
                         entityIndexBatch.index( indexScope, cpEntity );
 
@@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager {
         Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             String edgeType = iter.next();
-            indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
+            indexes.add( getNameFromEdgeType( edgeType ) );
         }
 
         return indexes;
@@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // remove item from collection index
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
 
         batch.deindex( indexScope, memberEntity );
 
         // remove collection from item index
-        IndexScope itemScope = new IndexScopeImpl(
-            memberEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName(
-                    Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) );
+        IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(),
+            Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) );
 
 
         batch.deindex( itemScope, cpHeadEntity );
 
-        BetterFuture future = batch.execute();
+        batch.execute();
 
         // remove edge from collection to item
         GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager {
                     + "' of " + headEntity.getType() + ":" + headEntity .getUuid() );
         }
 
-        final IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
 
         final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
@@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager {
         EntityIndexBatch batch = ei.createBatch();
 
         // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType );
 
         batch.index( indexScope, targetEntity );
 
@@ -1208,10 +1184,8 @@ public class CpRelationManager implements RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // Deindex the connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-            new SimpleId( connectingEntityRef.getUuid(),
-                connectingEntityRef.getType() ),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        final Id cpId =  createId( connectingEntityRef );
+        IndexScope indexScope = generateScopeFromConnection( cpId, connectionType );
         batch.deindex( indexScope, targetEntity );
 
         // Deindex the connection in app|source|type context
@@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager {
 
         headEntity = em.validate( headEntity );
 
-        final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connection ) );
+        final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection );
 
         final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index c14447d..3f2c9d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -36,6 +36,8 @@ import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+
 
 /**
  * Takes a visitor to all collections and entities.
@@ -128,7 +130,7 @@ public class CpWalker {
                 if ( entity == null ) {
                     return;
                 }
-                String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+                String collName = getNameFromEdgeType( edgeValue.getType() );
                 visitor.visitCollectionEntry( em, collName, entity );
             } ).subscribeOn( Schedulers.io() );
         }, 100 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 78c1ca7..57d69bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted {
             return;
         }
 
-
-
-        if(logger.isDebugEnabled()) {
-            logger.debug(
-                "Handling deleted event for entity {}:{} v {} " + " app: {}",
-                new Object[] {
-                    entityId.getType(), entityId.getUuid(), version,
-                    scope.getApplication()
-                } );
-        }
-
-        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
-        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
-
-        throw new NotImplementedException( "Fix this" );
+//        This is a NO-OP now, it's handled by the EntityVersionDeletedHandler
+
+//
+//        if(logger.isDebugEnabled()) {
+//            logger.debug(
+//                "Handling deleted event for entity {}:{} v {} " + " app: {}",
+//                new Object[] {
+//                    entityId.getType(), entityId.getUuid(), version,
+//                    scope.getApplication()
+//                } );
+//        }
+//
+//        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+//        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+
+//        throw new NotImplementedException( "Fix this" );
 
         //read all edges to this node and de-index them
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index c000500..0163fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
 
     @Override
     public void versionCreated( final ApplicationScope scope, final Entity entity ) {
-        //not op, we're not migrating properly to this.  Make this an event
+        //not op, we're not migrating properly to this.  Make this an event At the moment this is happening on write
 
 //        // This check is for testing purposes and for a test that to be able to dynamically turn
 //        // off and on delete previous versions so that it can test clean-up on read.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 4fa5ce1..a2e9b30 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,18 +24,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import org.apache.usergrid.exception.NotImplementedException;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import rx.Observable;
+
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
 
 
 /**
@@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
 
 
     private final EntityManagerFactory emf;
+    private final EdgesObservable edgesObservable;
+    private final SerializationFig serializationFig;
 
 
     @Inject
-    public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+    public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable,
+                                        final SerializationFig serializationFig ) {
+        this.emf = emf;
+        this.edgesObservable = edgesObservable;
+        this.serializationFig = serializationFig;
+    }
 
 
     @Override
@@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
         }
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
-                    + "  app: {}", new Object[] {
-                    entityVersions.size(), entityId.getType(), entityId.getUuid(),
-                    scope.getApplication()
-                } );
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "  app: {}", new Object[] {
+                entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getApplication()
+            } );
         }
 
         CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
 
         final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+        final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope );
+
+
+        //create an observable of all scopes to deIndex
+        //remove all indexes pointing to this
+        final Observable<IndexScope> targetScopes =  edgesObservable.edgesToTarget( gm, entityId ).map(
+            edge -> generateScopeFromSource( edge) );
+
+
+        //Remove all double indexes
+        final Observable<IndexScope> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map(
+                    edge -> generateScopeToTarget( edge ) );
+
+
+        //create a stream of scopes
+        final Observable<IndexScopeVersion> versions = Observable.merge( targetScopes, sourceScopes ).flatMap(
+            indexScope -> Observable.from( entityVersions )
+                                    .map( version -> new IndexScopeVersion( indexScope, version ) ) );
+
+        //create a set of batches
+        final Observable<EntityIndexBatch> batches = versions.buffer( serializationFig.getBufferSize() ).flatMap(
+            bufferedVersions -> Observable.from( bufferedVersions ).collect( () -> ei.createBatch(),
+                ( EntityIndexBatch batch, IndexScopeVersion version ) -> {
+                    //deindex in this batch
+                    batch.deindex( version.scope, version.version.getEntityId(), version.version.getVersion() );
+                } ) );
 
 
-        throw new NotImplementedException( "Fix this" );
 
+        //execute the batches
+        batches.doOnNext( batch -> batch.execute() ).toBlocking().last();
 
-//        final IndexScope indexScope =
-//            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
-//                scope.getName() );
-//
-//        //create our batch, and then collect all of them into a single batch
-//        Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
-//            entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-//        } )
-//            //after our batch is collected, execute it
-//            .doOnNext( entityIndexBatch -> {
-//                entityIndexBatch.execute();
-//            } ).toBlocking().last();
+    }
+
+
+
+
+
+    private static final class IndexScopeVersion{
+        private final IndexScope scope;
+        private final MvccLogEntry version;
+
+
+        private IndexScopeVersion( final IndexScope scope, final MvccLogEntry version ) {
+            this.scope = scope;
+            this.version = version;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 2eb6675..566656b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -119,8 +119,7 @@ public class FilteringLoader implements ResultsLoader {
 
             final CandidateResult currentCandidate = iter.next();
 
-            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
-                    currentCandidate.getId().getType() );
+            final String collectionType = currentCandidate.getId().getType() ;
 
             final Id entityId = currentCandidate.getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 2e9fb55..652742b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util;
 
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.clearspring.analytics.util.Preconditions;
+
 
 /**
  * Utilises for constructing standard naming conventions for collections and connections
@@ -80,66 +86,150 @@ public class CpNamingUtils {
      * @param type
      * @return
      */
-    public static String getCollectionScopeNameFromEntityType( String type ) {
+    private static String getCollectionScopeNameFromEntityType( String type ) {
         String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
         return csn.toLowerCase();
     }
 
 
-    public static String getCollectionScopeNameFromCollectionName( String name ) {
+    private static String getCollectionScopeNameFromCollectionName( String name ) {
         String csn = EDGE_COLL_SUFFIX + name;
         return csn.toLowerCase();
     }
 
 
-    public static String getConnectionScopeName( String connectionType ) {
+    private static String getConnectionScopeName( String connectionType ) {
         String csn = EDGE_CONN_SUFFIX + connectionType ;
         return csn.toLowerCase();
     }
 
 
-    public static boolean isCollectionEdgeType( String type ) {
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeFromSource(final Edge edge ){
+
+
+        final Id nodeId = edge.getSourceNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+
+
+
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeToTarget(final Edge edge ){
+
+
+
+        final Id nodeId = edge.getTargetNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+    /**
+     * Generate either the collection name or connection name from the edgeName
+     * @param edgeName
+     * @return
+     */
+    public static String getNameFromEdgeType(final String edgeName){
+
+
+        if(isCollectionEdgeType( edgeName )){
+           return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) );
+        }
+
+        return getConnectionScopeName(getConnectionType( edgeName )  );
+
+    }
+
+
+    /**
+     * Get the index scope from the colleciton name
+     * @param nodeId The source or target node id
+     * @param collectionName The name of the collection.  Ex "users"
+     * @return
+     */
+    public static IndexScope generateScopeFromCollection( final Id nodeId, final String collectionName ){
+        return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) );
+    }
+
+
+    /**
+     * Get the scope from the connection
+     * @param nodeId
+     * @param connectionName
+     * @return
+     */
+    public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){
+        return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) );
+    }
+
+
+    /**
+     * Create an Id object from the entity ref
+     * @param entityRef
+     * @return
+     */
+    public static Id createId(final EntityRef entityRef){
+      return new SimpleId( entityRef.getUuid(), entityRef.getType() );
+    }
+
+    private static boolean isCollectionEdgeType( String type ) {
         return type.startsWith( EDGE_COLL_SUFFIX );
     }
 
 
-    public static boolean isConnectionEdgeType( String type ) {
+    private static boolean isConnectionEdgeType( String type ) {
         return type.startsWith( EDGE_CONN_SUFFIX );
     }
 
 
-    static public String  getConnectionType( String edgeType ) {
+
+    private static  String  getConnectionType( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
-    static public String getCollectionName( String edgeType ) {
+    private static String getCollectionName( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
+    /**
+     * Generate a standard edge name for our graph using the connection name
+     * @param connectionType The type of connection made
+     * @return
+     */
     public static String getEdgeTypeFromConnectionType( String connectionType ) {
-
-        if ( connectionType != null ) {
-            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
-            return csn;
-        }
-
-        return null;
+        return  (EDGE_CONN_SUFFIX  + "|" + connectionType).toLowerCase();
     }
 
 
+    /**
+     * Generate a standard edges from for a collection
+     * @param collectionName
+     * @return
+     */
     public static String getEdgeTypeFromCollectionName( String collectionName ) {
-
-        if ( collectionName != null ) {
-            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
-            return csn;
-        }
-
-
-        return null;
+        return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5c166c5..f743f0b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.fasterxml.uuid.UUIDComparator;
@@ -60,7 +61,9 @@ import com.google.inject.Injector;
 import net.jcip.annotations.NotThreadSafe;
 
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         EntityManager em = app.getEntityManager();
 
-        EntityIndexFactory eif =  SpringResource.getInstance().getBean( Injector.class ).getInstance( EntityIndexFactory.class );
+        EntityIndexFactory eif =  SpringResource.getInstance().getBean( Injector.class ).getInstance(
+            EntityIndexFactory.class );
 
         ApplicationScope as = new ApplicationScopeImpl(
             new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) );
         ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as);
 
-        IndexScope is = new IndexScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION);
+        IndexScope is = generateScopeFromCollection(rootId, collName );
         Query rcq = Query.fromQL( query );
 
         // TODO: why does this have no effect; max we ever get is 1000 entities

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 50c2cd9..3bfe460 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
                 final Id source = edge.getSourceNode();
 
                 //test if we're a collection, if so
-                if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    final String collectionName = CpNamingUtils.getCollectionName( edgeType );
+                if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                    final String collectionName = EdgeTestUtils.getNameForEdge( edgeType );
 
                     assertEquals("application source returned", createdApplication.getUuid(), source.getUuid());
 
@@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
 
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
+                if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
                     fail( "Only connection edges should be encountered" );
                 }
 
-                final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+                final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
                 assertEquals( "Same connection type expected", "likes", connectionType );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9f1bb17..6d228b2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                //test if we're a collection, if so remove ourselves fro the types
-                if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    fail( "Connections should be the only type encountered" );
-                }
+            //test if we're a collection, if so remove ourselves fro the types
+            if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                fail( "Connections should be the only type encountered" );
+            }
 
 
-                final String collectionType = CpNamingUtils.getCollectionName( edgeType );
+            final String collectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
-                if ( collectionType.equals( type1 ) ) {
-                    assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
-                }
-                else if ( collectionType.equals( type2 ) ) {
-                    assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
-                }
+            if ( collectionType.equals( type1 ) ) {
+                assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
+            }
+            else if ( collectionType.equals( type2 ) ) {
+                assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
+            }
 
 
-            }
         } ).toBlocking().lastOrDefault( null );
 
 
@@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
-                    fail( "Only connection edges should be encountered" );
-                }
+            if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
+                fail( "Only connection edges should be encountered" );
+            }
 
-                final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+            final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
-                assertEquals( "Same connection type expected", "likes", connectionType );
+            assertEquals( "Same connection type expected", "likes", connectionType );
 
 
-                assertTrue( "Element should be present on removal", connections.remove( target ) );
-            }
+            assertTrue( "Element should be present on removal", connections.remove( target ) );
         } ).toBlocking().lastOrDefault( null );
 
         assertEquals( "Every connection should have been encountered", 0, connections.size() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
new file mode 100644
index 0000000..f217338
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.utils;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EdgeTestUtils {
+
+    /**
+     * Get the name for an edge
+     */
+    public static String getNameForEdge( final String edgeName ) {
+        final String[] parts = edgeName.split( "\\|" );
+
+        assertEquals( "there should be 2 parts", parts.length, 2 );
+
+        return parts[1];
+    }
+
+
+    public static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX );
+    }
+
+
+    public static boolean isConnectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 2bd1edb..84a7fc3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
 
 
         try {
+            subscriber.onStart();
+
             //get our iterator and push data to the observer
             final Iterator<T> itr = getIterator();