You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/30 20:07:18 UTC

incubator-usergrid git commit: Refactored index scope generation to be more consistent and clean

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-509 fbce62df8 -> d886e55d3 (forced update)


Refactored index scope generation to be more consistent and clean

Moved some newly private methods to test utils

Added onStart event to the observable iterator

Removes group by collection type from filtering loader


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d886e55d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d886e55d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d886e55d

Branch: refs/heads/USERGRID-509
Commit: d886e55d33dee28ce60f98abbdb3fc35d78b6cab
Parents: cf3f7ab
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 30 09:45:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 30 12:07:09 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  29 +---
 .../corepersistence/CpRelationManager.java      |  61 +++------
 .../usergrid/corepersistence/CpWalker.java      |   4 +-
 .../events/EntityDeletedHandler.java            |  31 ++---
 .../events/EntityVersionCreatedHandler.java     |   2 +-
 .../events/EntityVersionDeletedHandler.java     |  87 +++++++++---
 .../results/FilteringLoader.java                |   3 +-
 .../corepersistence/util/CpNamingUtils.java     | 134 ++++++++++++++++---
 .../corepersistence/StaleIndexCleanupTest.java  |  10 +-
 .../rx/EdgesFromSourceObservableIT.java         |   9 +-
 .../rx/EdgesToTargetObservableIT.java           |  53 ++++----
 .../apache/usergrid/utils/EdgeTestUtils.java    |  50 +++++++
 .../persistence/core/rx/ObservableIterator.java |   2 +
 13 files changed, 307 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index c45c390..a7dda13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -69,7 +69,6 @@ import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.entities.Event;
 import org.apache.usergrid.persistence.entities.Group;
@@ -80,10 +79,8 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
 
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -129,7 +125,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
-
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-
-            throw hre;
-        }
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity );
@@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager {
     @Override
     public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
 
-        IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
-                getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-
-
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
         //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
 
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-        }
 
         // Index CP entity into default collection scope
         //        IndexScope defaultIndexScope = new IndexScopeImpl(
@@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager {
         final EntityIndexBatch batch = aie.createBatch();
 
         // index member into entity collection | type scope
-        IndexScope collectionIndexScope = new IndexScopeImpl( collectionEntity.getId(),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
-
+        IndexScope collectionIndexScope = generateScopeFromCollection( collectionEntity.getId(), collName );
         batch.index( collectionIndexScope, memberEntity );
 
         //TODO REMOVE INDEX CODE

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3427684..b76f38f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -79,7 +79,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -118,6 +117,11 @@ import rx.functions.Func1;
 import static java.util.Arrays.asList;
 
 import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager {
                 final EntityRef eref =
                     new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                String name;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
-                    name = CpNamingUtils.getConnectionType( edge.getType() );
-                }
-                else {
-                    name = CpNamingUtils.getCollectionName( edge.getType() );
-                }
+                String name = getNameFromEdgeType(edge.getType());
                 addMapSet( entityRefSetMap, eref, name );
             }
          ).toBlocking().last();
@@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager {
                     @Override
                     public void call( final Edge edge ) {
 
-                        EntityRef sourceEntity =
-                                new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
                         // reindex the entity in the source entity's collection or connection index
 
-                        IndexScope indexScope;
-                        if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
-
-                            String collName = CpNamingUtils.getCollectionName( edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()),
-                                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
-                        }
-                        else {
+                        IndexScope indexScope = generateScopeFromSource(edge);
 
-                            String connName = CpNamingUtils.getConnectionType( edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
-                                CpNamingUtils.getConnectionScopeName( connName ) );
-                        }
 
                         entityIndexBatch.index( indexScope, cpEntity );
 
@@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager {
         Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             String edgeType = iter.next();
-            indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
+            indexes.add( getNameFromEdgeType( edgeType ) );
         }
 
         return indexes;
@@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // remove item from collection index
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
 
         batch.deindex( indexScope, memberEntity );
 
         // remove collection from item index
-        IndexScope itemScope = new IndexScopeImpl(
-            memberEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName(
-                    Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) );
+        IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(),
+            Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) );
 
 
         batch.deindex( itemScope, cpHeadEntity );
 
-        BetterFuture future = batch.execute();
+        batch.execute();
 
         // remove edge from collection to item
         GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager {
                     + "' of " + headEntity.getType() + ":" + headEntity .getUuid() );
         }
 
-        final IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
 
         final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
@@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager {
         EntityIndexBatch batch = ei.createBatch();
 
         // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType );
 
         batch.index( indexScope, targetEntity );
 
@@ -1208,10 +1184,8 @@ public class CpRelationManager implements RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // Deindex the connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-            new SimpleId( connectingEntityRef.getUuid(),
-                connectingEntityRef.getType() ),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        final Id cpId =  createId( connectingEntityRef );
+        IndexScope indexScope = generateScopeFromConnection( cpId, connectionType );
         batch.deindex( indexScope, targetEntity );
 
         // Deindex the connection in app|source|type context
@@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager {
 
         headEntity = em.validate( headEntity );
 
-        final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connection ) );
+        final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection );
 
         final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index c14447d..3f2c9d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -36,6 +36,8 @@ import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+
 
 /**
  * Takes a visitor to all collections and entities.
@@ -128,7 +130,7 @@ public class CpWalker {
                 if ( entity == null ) {
                     return;
                 }
-                String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+                String collName = getNameFromEdgeType( edgeValue.getType() );
                 visitor.visitCollectionEntry( em, collName, entity );
             } ).subscribeOn( Schedulers.io() );
         }, 100 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 78c1ca7..57d69bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted {
             return;
         }
 
-
-
-        if(logger.isDebugEnabled()) {
-            logger.debug(
-                "Handling deleted event for entity {}:{} v {} " + " app: {}",
-                new Object[] {
-                    entityId.getType(), entityId.getUuid(), version,
-                    scope.getApplication()
-                } );
-        }
-
-        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
-        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
-
-        throw new NotImplementedException( "Fix this" );
+//        This is a NO-OP now, it's handled by the EntityVersionDeletedHandler
+
+//
+//        if(logger.isDebugEnabled()) {
+//            logger.debug(
+//                "Handling deleted event for entity {}:{} v {} " + " app: {}",
+//                new Object[] {
+//                    entityId.getType(), entityId.getUuid(), version,
+//                    scope.getApplication()
+//                } );
+//        }
+//
+//        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+//        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+
+//        throw new NotImplementedException( "Fix this" );
 
         //read all edges to this node and de-index them
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index c000500..0163fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
 
     @Override
     public void versionCreated( final ApplicationScope scope, final Entity entity ) {
-        //not op, we're not migrating properly to this.  Make this an event
+        //not op, we're not migrating properly to this.  Make this an event At the moment this is happening on write
 
 //        // This check is for testing purposes and for a test that to be able to dynamically turn
 //        // off and on delete previous versions so that it can test clean-up on read.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 4fa5ce1..a2e9b30 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,18 +24,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import org.apache.usergrid.exception.NotImplementedException;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import rx.Observable;
+
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
 
 
 /**
@@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
 
 
     private final EntityManagerFactory emf;
+    private final EdgesObservable edgesObservable;
+    private final SerializationFig serializationFig;
 
 
     @Inject
-    public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+    public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable,
+                                        final SerializationFig serializationFig ) {
+        this.emf = emf;
+        this.edgesObservable = edgesObservable;
+        this.serializationFig = serializationFig;
+    }
 
 
     @Override
@@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
         }
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
-                    + "  app: {}", new Object[] {
-                    entityVersions.size(), entityId.getType(), entityId.getUuid(),
-                    scope.getApplication()
-                } );
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "  app: {}", new Object[] {
+                entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getApplication()
+            } );
         }
 
         CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
 
         final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+        final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope );
+
+
+        //create an observable of all scopes to deIndex
+        //remove all indexes pointing to this
+        final Observable<IndexScope> targetScopes =  edgesObservable.edgesToTarget( gm, entityId ).map(
+            edge -> generateScopeFromSource( edge) );
+
+
+        //Remove all double indexes
+        final Observable<IndexScope> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map(
+                    edge -> generateScopeToTarget( edge ) );
+
+
+        //create a stream of scopes
+        final Observable<IndexScopeVersion> versions = Observable.merge( targetScopes, sourceScopes ).flatMap(
+            indexScope -> Observable.from( entityVersions )
+                                    .map( version -> new IndexScopeVersion( indexScope, version ) ) );
+
+        //create a set of batches
+        final Observable<EntityIndexBatch> batches = versions.buffer( serializationFig.getBufferSize() ).flatMap(
+            bufferedVersions -> Observable.from( bufferedVersions ).collect( () -> ei.createBatch(),
+                ( EntityIndexBatch batch, IndexScopeVersion version ) -> {
+                    //deindex in this batch
+                    batch.deindex( version.scope, version.version.getEntityId(), version.version.getVersion() );
+                } ) );
 
 
-        throw new NotImplementedException( "Fix this" );
 
+        //execute the batches
+        batches.doOnNext( batch -> batch.execute() ).toBlocking().last();
 
-//        final IndexScope indexScope =
-//            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
-//                scope.getName() );
-//
-//        //create our batch, and then collect all of them into a single batch
-//        Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
-//            entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-//        } )
-//            //after our batch is collected, execute it
-//            .doOnNext( entityIndexBatch -> {
-//                entityIndexBatch.execute();
-//            } ).toBlocking().last();
+    }
+
+
+
+
+
+    private static final class IndexScopeVersion{
+        private final IndexScope scope;
+        private final MvccLogEntry version;
+
+
+        private IndexScopeVersion( final IndexScope scope, final MvccLogEntry version ) {
+            this.scope = scope;
+            this.version = version;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 2eb6675..566656b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -119,8 +119,7 @@ public class FilteringLoader implements ResultsLoader {
 
             final CandidateResult currentCandidate = iter.next();
 
-            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
-                    currentCandidate.getId().getType() );
+            final String collectionType = currentCandidate.getId().getType() ;
 
             final Id entityId = currentCandidate.getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 2e9fb55..652742b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util;
 
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.clearspring.analytics.util.Preconditions;
+
 
 /**
  * Utilises for constructing standard naming conventions for collections and connections
@@ -80,66 +86,150 @@ public class CpNamingUtils {
      * @param type
      * @return
      */
-    public static String getCollectionScopeNameFromEntityType( String type ) {
+    private static String getCollectionScopeNameFromEntityType( String type ) {
         String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
         return csn.toLowerCase();
     }
 
 
-    public static String getCollectionScopeNameFromCollectionName( String name ) {
+    private static String getCollectionScopeNameFromCollectionName( String name ) {
         String csn = EDGE_COLL_SUFFIX + name;
         return csn.toLowerCase();
     }
 
 
-    public static String getConnectionScopeName( String connectionType ) {
+    private static String getConnectionScopeName( String connectionType ) {
         String csn = EDGE_CONN_SUFFIX + connectionType ;
         return csn.toLowerCase();
     }
 
 
-    public static boolean isCollectionEdgeType( String type ) {
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeFromSource(final Edge edge ){
+
+
+        final Id nodeId = edge.getSourceNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+
+
+
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeToTarget(final Edge edge ){
+
+
+
+        final Id nodeId = edge.getTargetNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+    /**
+     * Generate either the collection name or connection name from the edgeName
+     * @param edgeName
+     * @return
+     */
+    public static String getNameFromEdgeType(final String edgeName){
+
+
+        if(isCollectionEdgeType( edgeName )){
+           return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) );
+        }
+
+        return getConnectionScopeName(getConnectionType( edgeName )  );
+
+    }
+
+
+    /**
+     * Get the index scope from the colleciton name
+     * @param nodeId The source or target node id
+     * @param collectionName The name of the collection.  Ex "users"
+     * @return
+     */
+    public static IndexScope generateScopeFromCollection( final Id nodeId, final String collectionName ){
+        return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) );
+    }
+
+
+    /**
+     * Get the scope from the connection
+     * @param nodeId
+     * @param connectionName
+     * @return
+     */
+    public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){
+        return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) );
+    }
+
+
+    /**
+     * Create an Id object from the entity ref
+     * @param entityRef
+     * @return
+     */
+    public static Id createId(final EntityRef entityRef){
+      return new SimpleId( entityRef.getUuid(), entityRef.getType() );
+    }
+
+    private static boolean isCollectionEdgeType( String type ) {
         return type.startsWith( EDGE_COLL_SUFFIX );
     }
 
 
-    public static boolean isConnectionEdgeType( String type ) {
+    private static boolean isConnectionEdgeType( String type ) {
         return type.startsWith( EDGE_CONN_SUFFIX );
     }
 
 
-    static public String  getConnectionType( String edgeType ) {
+
+    private static  String  getConnectionType( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
-    static public String getCollectionName( String edgeType ) {
+    private static String getCollectionName( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
+    /**
+     * Generate a standard edge name for our graph using the connection name
+     * @param connectionType The type of connection made
+     * @return
+     */
     public static String getEdgeTypeFromConnectionType( String connectionType ) {
-
-        if ( connectionType != null ) {
-            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
-            return csn;
-        }
-
-        return null;
+        return  (EDGE_CONN_SUFFIX  + "|" + connectionType).toLowerCase();
     }
 
 
+    /**
+     * Generate a standard edges from for a collection
+     * @param collectionName
+     * @return
+     */
     public static String getEdgeTypeFromCollectionName( String collectionName ) {
-
-        if ( collectionName != null ) {
-            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
-            return csn;
-        }
-
-
-        return null;
+        return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5c166c5..f743f0b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.fasterxml.uuid.UUIDComparator;
@@ -60,7 +61,9 @@ import com.google.inject.Injector;
 import net.jcip.annotations.NotThreadSafe;
 
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         EntityManager em = app.getEntityManager();
 
-        EntityIndexFactory eif =  SpringResource.getInstance().getBean( Injector.class ).getInstance( EntityIndexFactory.class );
+        EntityIndexFactory eif =  SpringResource.getInstance().getBean( Injector.class ).getInstance(
+            EntityIndexFactory.class );
 
         ApplicationScope as = new ApplicationScopeImpl(
             new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) );
         ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as);
 
-        IndexScope is = new IndexScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION);
+        IndexScope is = generateScopeFromCollection(rootId, collName );
         Query rcq = Query.fromQL( query );
 
         // TODO: why does this have no effect; max we ever get is 1000 entities

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 50c2cd9..3bfe460 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
                 final Id source = edge.getSourceNode();
 
                 //test if we're a collection, if so
-                if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    final String collectionName = CpNamingUtils.getCollectionName( edgeType );
+                if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                    final String collectionName = EdgeTestUtils.getNameForEdge( edgeType );
 
                     assertEquals("application source returned", createdApplication.getUuid(), source.getUuid());
 
@@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
 
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
+                if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
                     fail( "Only connection edges should be encountered" );
                 }
 
-                final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+                final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
                 assertEquals( "Same connection type expected", "likes", connectionType );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9f1bb17..6d228b2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                //test if we're a collection, if so remove ourselves fro the types
-                if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    fail( "Connections should be the only type encountered" );
-                }
+            //test if we're a collection, if so remove ourselves fro the types
+            if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                fail( "Connections should be the only type encountered" );
+            }
 
 
-                final String collectionType = CpNamingUtils.getCollectionName( edgeType );
+            final String collectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
-                if ( collectionType.equals( type1 ) ) {
-                    assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
-                }
-                else if ( collectionType.equals( type2 ) ) {
-                    assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
-                }
+            if ( collectionType.equals( type1 ) ) {
+                assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
+            }
+            else if ( collectionType.equals( type2 ) ) {
+                assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
+            }
 
 
-            }
         } ).toBlocking().lastOrDefault( null );
 
 
@@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
-                    fail( "Only connection edges should be encountered" );
-                }
+            if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
+                fail( "Only connection edges should be encountered" );
+            }
 
-                final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+            final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
-                assertEquals( "Same connection type expected", "likes", connectionType );
+            assertEquals( "Same connection type expected", "likes", connectionType );
 
 
-                assertTrue( "Element should be present on removal", connections.remove( target ) );
-            }
+            assertTrue( "Element should be present on removal", connections.remove( target ) );
         } ).toBlocking().lastOrDefault( null );
 
         assertEquals( "Every connection should have been encountered", 0, connections.size() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
new file mode 100644
index 0000000..f217338
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.utils;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EdgeTestUtils {
+
+    /**
+     * Get the name for an edge
+     */
+    public static String getNameForEdge( final String edgeName ) {
+        final String[] parts = edgeName.split( "\\|" );
+
+        assertEquals( "there should be 2 parts", parts.length, 2 );
+
+        return parts[1];
+    }
+
+
+    public static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX );
+    }
+
+
+    public static boolean isConnectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d886e55d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 2bd1edb..84a7fc3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
 
 
         try {
+            subscriber.onStart();
+
             //get our iterator and push data to the observer
             final Iterator<T> itr = getIterator();