You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/12/15 19:32:22 UTC

[38/50] incubator-usergrid git commit: Merge branch 'two-dot-o' into two-dot-o-events

Merge branch 'two-dot-o' into two-dot-o-events

Conflicts:
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
	stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
	stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c3261795
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c3261795
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c3261795

Branch: refs/heads/two-dot-o
Commit: c3261795af2e0992ae9aecd8332e733dac42e0a8
Parents: 1884cce bd0a1e9
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Nov 25 07:31:05 2014 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Nov 25 07:31:05 2014 -0500

----------------------------------------------------------------------
 stack/core/pom.xml                              |   32 +-
 .../corepersistence/CpEntityManager.java        |  134 +-
 .../corepersistence/CpEntityManagerFactory.java |    2 +-
 .../corepersistence/CpRelationManager.java      |   35 +-
 .../usergrid/corepersistence/GuiceModule.java   |    1 +
 .../migration/EntityDataMigration.java          |  147 ++
 .../migration/EntityTypeMappingMigration.java   |   28 +-
 .../migration/GraphShardVersionMigration.java   |  104 +-
 .../corepersistence/migration/Versions.java     |    3 +
 .../rx/AllEntitiesInSystemObservable.java       |   27 +-
 .../rx/ApplicationObservable.java               |   65 +-
 .../corepersistence/util/CpEntityMapUtils.java  |   62 +-
 .../corepersistence/util/CpNamingUtils.java     |   17 +
 .../org/apache/usergrid/CoreITSetupImpl.java    |   13 +
 .../corepersistence/StaleIndexCleanupTest.java  |    6 +-
 .../migration/EntityDataMigrationIT.java        |  252 ++++
 .../migration/EntityTypeMappingMigrationIT.java |   74 +-
 .../migration/GraphShardVersionMigrationIT.java |  153 +-
 .../rx/AllEntitiesInSystemObservableIT.java     |   23 +-
 .../usergrid/persistence/LargeEntityIT.java     |  112 ++
 stack/core/src/test/resources/largeentity.json  | 1329 ++++++++++++++++++
 .../collection/EntityCollectionManager.java     |    2 +-
 .../exception/EntityTooLargeException.java      |   67 +
 .../collection/guice/CollectionModule.java      |    2 -
 .../impl/EntityCollectionManagerImpl.java       |  294 ++--
 .../collection/impl/EntityDeletedTask.java      |   11 +-
 .../impl/EntityVersionCleanupTask.java          |   13 +-
 .../mvcc/MvccEntitySerializationStrategy.java   |   23 +-
 .../mvcc/MvccLogEntrySerializationStrategy.java |    3 +-
 .../mvcc/stage/delete/MarkCommit.java           |   19 +-
 .../mvcc/stage/write/WriteCommit.java           |    3 +-
 .../serialization/SerializationFig.java         |   15 +-
 .../UniqueValueSerializationStrategy.java       |    3 +-
 .../serialization/impl/EntityRepairImpl.java    |    2 +-
 .../MvccEntitySerializationStrategyImpl.java    |  343 ++---
 ...vccEntitySerializationStrategyProxyImpl.java |  162 +++
 .../MvccEntitySerializationStrategyV1Impl.java  |  219 +++
 .../MvccEntitySerializationStrategyV2Impl.java  |  238 ++++
 .../MvccLogEntrySerializationStrategyImpl.java  |    2 +-
 .../serialization/impl/SerializationModule.java |   25 +-
 .../serialization/impl/SettingsValidation.java  |   58 +
 .../UniqueValueSerializationStrategyImpl.java   |    2 +-
 .../EntityCollectionManagerFactoryTest.java     |    2 +-
 .../collection/EntityCollectionManagerIT.java   |   64 +-
 .../EntityCollectionManagerStressTest.java      |    2 +-
 .../EntityCollectionManagerSyncIT.java          |    2 +-
 .../collection/guice/MigrationManagerRule.java  |   38 -
 .../collection/guice/TestCollectionModule.java  |   10 +
 .../impl/EntityVersionCleanupTaskTest.java      |   11 +-
 ...niqueValueSerializationStrategyImplTest.java |    2 +-
 .../mvcc/stage/write/WriteUniqueVerifyIT.java   |    2 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |    2 +-
 .../serialization/EntityRepairImplTest.java     |    2 +-
 ...MvccEntitySerializationStrategyImplTest.java |   87 +-
 ...cEntitySerializationStrategyProxyV1Test.java |   85 ++
 ...cEntitySerializationStrategyProxyV2Test.java |   83 ++
 ...ccEntitySerializationStrategyV1ImplTest.java |   55 +
 ...ccEntitySerializationStrategyV2ImplTest.java |   55 +
 .../MvccEntitySerializationStrategyV2Test.java  |  229 +++
 .../impl/MvccLESSTransientTest.java             |   55 +-
 ...ccLogEntrySerializationStrategyImplTest.java |    2 +-
 .../impl/SettingsValidationTest.java            |  131 ++
 .../collection/util/EntityHelper.java           |  105 ++
 .../src/test/resources/log4j.properties         |   21 +-
 .../persistence/core/astyanax/CassandraFig.java |   12 +
 .../persistence/core/astyanax/FieldBuffer.java  |   70 +
 .../core/astyanax/FieldBufferBuilder.java       |   87 ++
 .../core/astyanax/FieldBufferParser.java        |   81 ++
 .../core/astyanax/FieldBufferSerializer.java    |  134 ++
 .../core/guicyfig/SetConfigTestBypass.java      |  108 ++
 .../astyanax/FieldBufferSerializerTest.java     |  126 ++
 .../core/guice/MaxMigrationModule.java          |   39 +
 .../core/guice/MaxMigrationVersion.java         |   40 +
 .../core/guice/MigrationManagerRule.java        |   19 +-
 .../persistence/graph/GraphManagerIT.java       |    2 +-
 .../persistence/graph/GraphManagerLoadTest.java |    2 +-
 .../graph/GraphManagerShardingIT.java           |    2 +-
 .../graph/GraphManagerStressTest.java           |    2 +-
 .../usergrid/persistence/graph/SimpleTest.java  |    2 +-
 .../graph/guice/TestGraphModule.java            |   11 +
 .../graph/impl/EdgeDeleteListenerTest.java      |    2 +-
 .../graph/impl/NodeDeleteListenerTest.java      |    2 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |    2 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |    2 +-
 .../EdgeMetadataSerializationTest.java          |    2 +-
 .../EdgeSerializationChopTest.java              |    2 +-
 .../serialization/EdgeSerializationTest.java    |    2 +-
 .../serialization/NodeSerializationTest.java    |    2 +-
 .../impl/shard/EdgeShardSerializationTest.java  |    2 +-
 .../NodeShardCounterSerializationTest.java      |    2 +-
 .../persistence/map/MapManagerTest.java         |    2 +-
 .../persistence/model/field/LocationField.java  |    3 -
 .../index/impl/EntityIndexMapUtils.java         |    4 +-
 .../persistence/queue/QueueManagerTest.java     |   26 +-
 .../collection/groups/GroupResourceIT.java      |  182 +--
 .../applications/queries/GeoPagingTest.java     |   41 +-
 .../applications/queries/basicGeoTests.java     |  184 +++
 97 files changed, 5219 insertions(+), 1150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 5a193e5,b2d854b..c3e5bbe
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@@ -69,7 -69,7 +69,6 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.EntityCollectionManager;
  import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
  import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
--import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
  import org.apache.usergrid.persistence.core.scope.ApplicationScope;
  import org.apache.usergrid.persistence.core.util.Health;
  import org.apache.usergrid.persistence.entities.Application;
@@@ -214,23 -215,23 +214,22 @@@ public class CpEntityManager implement
          // set to false for now
          this.skipAggregateCounters = false;
  
--        int entityCacheSize =
--                Integer.parseInt( cass.getProperties().getProperty( "usergrid.entity_cache_size", "100" ) );
++        int entityCacheSize = Integer.parseInt( 
++                cass.getProperties().getProperty( "usergrid.entity_cache_size", "100" ) );
  
--        int entityCacheTimeout =
--                Integer.parseInt( cass.getProperties().getProperty( "usergrid.entity_cache_timeout_ms", "500" ) );
++        int entityCacheTimeout = Integer.parseInt( 
++                cass.getProperties().getProperty( "usergrid.entity_cache_timeout_ms", "500" ) );
  
          this.entityCache = CacheBuilder.newBuilder().maximumSize( entityCacheSize )
--                                       .expireAfterWrite( entityCacheTimeout, TimeUnit.MILLISECONDS )
--                                       .build( new CacheLoader<EntityScope, org.apache.usergrid.persistence.model
--                                               .entity.Entity>() {
--                                                   public org.apache.usergrid.persistence.model.entity.Entity load(
--                                                           EntityScope es ) {
--                                                       return managerCache.getEntityCollectionManager( es.scope )
--                                                                          .load( es.entityId ).toBlocking()
--                                                                          .lastOrDefault( null );
--                                                   }
--                                               } );
++            .expireAfterWrite(entityCacheTimeout, TimeUnit.MILLISECONDS)
++            .build(new CacheLoader<EntityScope, org.apache.usergrid.persistence.model.entity.Entity>() {
++                public org.apache.usergrid.persistence.model.entity.Entity load(
++                    EntityScope es) {
++                        return managerCache.getEntityCollectionManager(es.scope)
++                        .load(es.entityId).toBlocking()
++                        .lastOrDefault(null);
++                    }
++            });
      }
  
  
@@@ -347,8 -348,8 +346,8 @@@
       */
      @Metered( group = "core", name = "EntityManager_create" )
      @TraceParticipant
--    public <A extends Entity> A create( String entityType, Class<A> entityClass, Map<String, Object> properties,
--                                        UUID importId ) throws Exception {
++    public <A extends Entity> A create( String entityType, Class<A> entityClass, 
++            Map<String, Object> properties, UUID importId ) throws Exception {
  
          UUID timestampUuid = importId != null ? importId : UUIDUtils.newTimeUUID();
  
@@@ -372,13 -373,9 +371,10 @@@
          }
  
          Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
-         String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
  
-         CollectionScope collectionScope =
-                 new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                         collectionName );
 -        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  entityRef.getType());
++        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++                applicationScope.getApplication(),  entityRef.getType());
  
-         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
  
          //        if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
          //            throw new IllegalArgumentException(
@@@ -390,10 -387,10 +386,10 @@@
          if ( cpEntity == null ) {
              if ( logger.isDebugEnabled() ) {
                  logger.debug( "FAILED to load entity {}:{} from scope\n   app {}\n   owner {}\n   name {}",
--                        new Object[] {
--                                id.getType(), id.getUuid(), collectionScope.getApplication(),
--                                collectionScope.getOwner(), collectionScope.getName()
--                        } );
++                    new Object[] {
++                            id.getType(), id.getUuid(), collectionScope.getApplication(),
++                            collectionScope.getOwner(), collectionScope.getName()
++                    } );
              }
              return null;
          }
@@@ -457,13 -454,10 +453,11 @@@
          String type = Schema.getDefaultSchema().getEntityType( entityClass );
  
          Id id = new SimpleId( entityId, type );
-         String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( type );
  
-         CollectionScope collectionScope =
-                 new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                         collectionName );
  
-         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 -        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  type);
++        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++                applicationScope.getApplication(),  type);
+ 
  
          //        if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
          //            throw new IllegalArgumentException(
@@@ -522,18 -516,17 +516,22 @@@
      public void update( Entity entity ) throws Exception {
  
          // first, update entity index in its own collection scope
-         CollectionScope collectionScope =
-                 new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                         CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
+ 
 -        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(),  entity.getType());
++        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++                applicationScope.getApplication(),  entity.getType());
          EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
  
          Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
  
          if ( logger.isDebugEnabled() ) {
--            logger.debug( "Updating entity {}:{} from scope\n   app {}\n   owner {}\n   name {}", new Object[] {
--                            entityId.getType(), entityId.getUuid(), collectionScope.getApplication(),
--                            collectionScope.getOwner(), collectionScope.getName()
--                    } );
++            logger.debug( "Updating entity {}:{} from scope\n   app {}\n   owner {}\n   name {}", 
++                new Object[] {
++                    entityId.getType(), 
++                    entityId.getUuid(), 
++                    collectionScope.getApplication(),
++                    collectionScope.getOwner(), 
++                    collectionScope.getName()
++                } );
          }
  
          //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@@ -590,9 -583,7 +588,8 @@@
  
      private Observable deleteAsync( EntityRef entityRef ) throws Exception {
  
-         CollectionScope collectionScope =
-                 new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                         CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
 -        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entityRef.getType()  );
++        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++                applicationScope.getApplication(), entityRef.getType()  );
  
          EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
  
@@@ -866,9 -857,9 +863,8 @@@
      }
  
  
--    private Iterable<EntityRef> getEntityRefsForUniqueProperty( String collName, String propName, String alias )
--            throws Exception {
--
++    private Iterable<EntityRef> getEntityRefsForUniqueProperty( 
++            String collName, String propName, String alias ) throws Exception {
  
          final Id id = getIdForUniqueEntityField( collName, propName, alias );
  
@@@ -900,15 -891,15 +896,17 @@@
                  get( entityRef ).getType();
              }
              catch ( Exception e ) {
--                logger.error( "Unable to load entity " + entityRef.getType() + ":" + entityRef.getUuid(), e );
++                logger.error( "Unable to load entity " + entityRef.getType() 
++                        + ":" + entityRef.getUuid(), e );
              }
              if ( entityRef == null ) {
--                throw new EntityNotFoundException( "Entity " + entityId.toString() + " cannot be verified" );
++                throw new EntityNotFoundException( 
++                        "Entity " + entityId.toString() + " cannot be verified" );
              }
              if ( ( entityType != null ) && !entityType.equalsIgnoreCase( entityRef.getType() ) ) {
                  throw new UnexpectedEntityTypeException(
--                        "Entity " + entityId + " is not the expected type, expected " + entityType + ", found "
--                                + entityRef.getType() );
++                        "Entity " + entityId + " is not the expected type, expected " 
++                                + entityType + ", found " + entityRef.getType() );
              }
          }
          return entityRef;
@@@ -924,7 -915,7 +922,8 @@@
  
  
      @Override
--    public List<Entity> getPartialEntities( Collection<UUID> ids, Collection<String> properties ) throws Exception {
++    public List<Entity> getPartialEntities( 
++            Collection<UUID> ids, Collection<String> properties ) throws Exception {
          throw new UnsupportedOperationException( "Not supported yet." );
      }
  
@@@ -938,15 -929,15 +937,16 @@@
  
  
      @Override
--    public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
++    public void setProperty( 
++            EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
  
          setProperty( entityRef, propertyName, propertyValue, false );
      }
  
  
      @Override
--    public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue, boolean override )
--            throws Exception {
++    public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue, 
++            boolean override ) throws Exception { 
  
          if ( ( propertyValue instanceof String ) && ( ( String ) propertyValue ).equals( "" ) ) {
              propertyValue = null;
@@@ -954,8 -945,8 +954,8 @@@
  
          Entity entity = get( entityRef );
  
--        propertyValue =
--                Schema.getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
++        propertyValue = Schema.getDefaultSchema().validateEntityPropertyValue( 
++                entity.getType(), propertyName, propertyValue );
  
          entity.setProperty( propertyName, propertyValue );
          entity.setProperty( PROPERTY_MODIFIED, UUIDUtils.getTimestampInMillis( UUIDUtils.newTimeUUID() ) );
@@@ -1010,15 -1001,10 +1010,12 @@@
  
      @Override
      public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
 -        CollectionScope collectionScope =  getCollectionScopeNameFromEntityType(getApplicationScope().getApplication(), entityRef.getType());
 +
-         String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
- 
-         CollectionScope collectionScope =
-                 new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
-                         collectionName );
++        CollectionScope collectionScope =  getCollectionScopeNameFromEntityType(
++                getApplicationScope().getApplication(), entityRef.getType());
  
          IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
-                 CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+                 getCollectionScopeNameFromEntityType( entityRef.getType() ) );
  
          EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
          EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );
@@@ -1062,15 -1048,15 +1059,16 @@@
  
  
      @Override
--    public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue ) throws Exception {
++    public void addToDictionary( EntityRef entityRef, String dictionaryName, 
++            Object elementValue ) throws Exception {
  
          addToDictionary( entityRef, dictionaryName, elementValue, null );
      }
  
  
      @Override
--    public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementName, Object elementValue )
--            throws Exception {
++    public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementName, 
++            Object elementValue ) throws Exception {
  
          if ( elementName == null ) {
              return;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index a57158f,018a9b7..2ff6c4f
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@@ -18,15 -19,10 +18,16 @@@ package org.apache.usergrid.corepersist
  import com.google.inject.AbstractModule;
  import com.google.inject.multibindings.Multibinder;
  
+ import org.apache.usergrid.corepersistence.migration.EntityDataMigration;
  import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
  import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
 +import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 +import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 +import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
 +import org.apache.usergrid.persistence.EntityManagerFactory;
 +import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 +import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 +import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
  import org.apache.usergrid.persistence.collection.guice.CollectionModule;
  import org.apache.usergrid.persistence.core.guice.CommonModule;
  import org.apache.usergrid.persistence.core.migration.data.DataMigration;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index e1c0214,9d0c9e6..f159146
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@@ -51,17 -51,13 +51,18 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.model.entity.SimpleId;
  
  import com.fasterxml.uuid.UUIDComparator;
 +import java.util.concurrent.locks.Lock;
 +import java.util.concurrent.locks.ReentrantLock;
  
+ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
  import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 +import org.junit.After;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
 +import org.junit.Before;
  
  
 +//need to create system properties in test that can get applied
  /**
   * Test on read style clean-up of stale ElasticSearch indexes.
   */
@@@ -382,13 -221,9 +383,10 @@@ public class StaleIndexCleanupTest exte
  
          EntityManager em = app.getEntityManager();
  
-         CollectionScope cs = new CollectionScopeImpl( 
-             new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-             new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-             CpNamingUtils.getCollectionScopeNameFromEntityType( eref.getType() ) );
+         CollectionScope cs = getCollectionScopeNameFromEntityType(  new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
  
 -        EntityCollectionManagerFactory ecmf = CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
 +        EntityCollectionManagerFactory ecmf = 
 +                CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
  
          EntityCollectionManager ecm = ecmf.createCollectionManager( cs );
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index e47d460,534d7a6..4de18fe
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@@ -43,7 -45,7 +43,7 @@@ public interface EntityCollectionManage
      /**
       * MarkCommit the entity and remove it's indexes with the given entity id
       */
--    public Observable<Void> delete( Id entityId );
++    public Observable<Id> delete( Id entityId );
  
      /**
       * Load the entity with the given entity Id

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 4c96303,8015ca9..1c3e258
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@@ -18,8 -18,6 +18,7 @@@
  package org.apache.usergrid.persistence.collection.guice;
  
  
- import java.util.Collections;
 +
  import org.safehaus.guicyfig.GuicyFigModule;
  
  import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@@ -50,9 -43,7 +49,8 @@@ import com.google.inject.Inject
  import com.google.inject.Provides;
  import com.google.inject.Singleton;
  import com.google.inject.assistedinject.FactoryModuleBuilder;
 -
 +import com.google.inject.multibindings.Multibinder;
- import java.util.List;
 +import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
  
  
  /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 437c0d3,d54c5f7..a89924a
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -60,10 -65,6 +65,12 @@@ import com.netflix.astyanax.connectionp
  import com.netflix.astyanax.model.ColumnFamily;
  import com.netflix.astyanax.model.CqlResult;
  import com.netflix.astyanax.serializers.StringSerializer;
++import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 +import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
- import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
++import org.apache.usergrid.persistence.core.task.Task;
 +import org.apache.usergrid.persistence.core.task.TaskExecutor;
- import org.apache.usergrid.persistence.core.util.Health;
  
  import rx.Observable;
  import rx.Subscriber;
@@@ -109,24 -106,16 +117,26 @@@ public class EntityCollectionManagerImp
  
  
      @Inject
-     public EntityCollectionManagerImpl(
-         @Write final WriteStart writeStart,
-         @WriteUpdate final WriteStart writeUpdate,
-         final WriteUniqueVerify writeVerifyUnique,
-         final WriteOptimisticVerify writeOptimisticVerify,
-         final WriteCommit writeCommit, final RollbackAction rollback,
-         final MarkStart markStart, final MarkCommit markCommit,
-         final MvccEntitySerializationStrategy entitySerializationStrategy,
-         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
-         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-         final Keyspace keyspace,
-         final SerializationFig config,
-         final EntityVersionCleanupFactory entityVersionCleanupFactory,
-         final EntityVersionCreatedFactory entityVersionCreatedFactory,
-         final EntityDeletedFactory        entityDeletedFactory,
 -    public EntityCollectionManagerImpl( @Write final WriteStart writeStart, @WriteUpdate final WriteStart writeUpdate,
 -                                        final WriteUniqueVerify writeVerifyUnique,
 -                                        final WriteOptimisticVerify writeOptimisticVerify,
 -                                        final WriteCommit writeCommit, final RollbackAction rollback,
 -                                        final MarkStart markStart, final MarkCommit markCommit,
 -                                        @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
 -                                        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
 -                                        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
 -                                        final Keyspace keyspace, final SerializationFig config,
 -                                        @Assisted final CollectionScope collectionScope ) {
++    public EntityCollectionManagerImpl( 
++        @Write final WriteStart                    writeStart, 
++        @WriteUpdate final WriteStart              writeUpdate,
++        final WriteUniqueVerify                    writeVerifyUnique,
++        final WriteOptimisticVerify                writeOptimisticVerify,
++        final WriteCommit                          writeCommit, 
++        final RollbackAction                       rollback,
++        final MarkStart                            markStart, 
++        final MarkCommit                           markCommit,
++        @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
++        final UniqueValueSerializationStrategy     uniqueValueSerializationStrategy,
++        final MvccLogEntrySerializationStrategy    mvccLogEntrySerializationStrategy,
++        final Keyspace                             keyspace, 
++        final SerializationFig                     config,
++        final EntityVersionCleanupFactory          entityVersionCleanupFactory,
++        final EntityVersionCreatedFactory          entityVersionCreatedFactory,
++        final EntityDeletedFactory                 entityDeletedFactory,
 +        @CollectionTaskExecutor final TaskExecutor taskExecutor,
-         @Assisted final CollectionScope collectionScope
++        @Assisted final CollectionScope            collectionScope
 +    ) {
          this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
          this.entitySerializationStrategy = entitySerializationStrategy;
  
@@@ -176,42 -161,25 +187,42 @@@
          // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), 
          //                  writeVerifyUnique, writeOptimisticVerify );
  
-         observable.map(writeCommit).doOnNext(new Action1<Entity>() {
 -        // return the commit result.
 -        return observable.map( writeCommit ).doOnError( rollback );
++        return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
 +            @Override
 +            public void call(final Entity entity) {
 +                //TODO fire the created task first then the entityVersioncleanup
 +                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
 +                taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
 +                //post-processing to come later. leave it empty for now.
 +            }
 +        }).doOnError(rollback);
- 
- 
-         // return the commit result.
-         return observable.map(writeCommit).doOnError(rollback);
      }
  
  
      @Override
-     public Observable<Void> delete(final Id entityId) {
- 
-         Preconditions.checkNotNull(entityId, "Entity id is required in this stage");
-         Preconditions.checkNotNull(entityId.getUuid(), "Entity id is required in this stage");
-         Preconditions.checkNotNull(entityId.getType(), "Entity type is required in this stage");
- 
 -    public Observable<Void> delete( final Id entityId ) {
++    public Observable<Id> delete( final Id entityId ) {
+ 
+         Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
+         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
+ 
 -        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map( markStart )
 -                         .doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>, Void>() {
 -                    @Override
 -                    public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 -                        return null;
 -                    }
 -                } );
++        Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
++            .map( markStart)
++            .doOnNext( markCommit)
++            .map( new Func1<CollectionIoEvent<MvccEntity>, Id>() {
++
++                @Override
++                public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
++                    MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
++                    Task<Void> task = entityDeletedFactory
++                        .getTask( collectionScope, entity.getId(), entity.getVersion());
++                    taskExecutor.submit(task);
++                    return entity.getId();
++                }
++            }
++        );
 +
-         Observable<Void> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)).map(markStart)
-                 .doOnNext(markCommit).map(new Func1<CollectionIoEvent<MvccEntity>, Void>() {
-                     @Override
-                     public Void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
-                         MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
-                         Task<Void> task = entityDeletedFactory.getTask(
-                                 collectionScope,entity.getId(),entity.getVersion());
-                         taskExecutor.submit(task);
-                         return null;
-                     }
-                 });
 +        return o;
      }
  
  
@@@ -289,23 -261,30 +304,31 @@@
  
          final Id entityId = entity.getId();
  
-         ValidationUtils.verifyIdentity(entityId);
+ 
+         ValidationUtils.verifyIdentity( entityId );
  
          // create our observable and start the write
-         CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
+         CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
  
-         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeUpdate);
  
-         return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
+         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
+ 
+ 
+         return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
              @Override
-             public void call(final Entity entity) {
-                 logger.debug("sending entity to the queue");
+             public void call( final Entity entity ) {
+                 logger.debug( "sending entity to the queue" );
  
                  //we an update, signal the fix
 +                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
  
+                 //TODO T.N Change this to fire a task
+                 //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
+                 // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
+ 
+ 
              }
-         }).doOnError(rollback);
+         } ).doOnError( rollback );
      }
  
  
@@@ -381,5 -358,4 +402,4 @@@
  
          return Health.RED;
      }
- 
--}
++}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 284cf5b,0000000..9ff4f56
mode 100644,000000..100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@@ -1,147 -1,0 +1,148 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.usergrid.persistence.collection.impl;
 +
 +import com.google.inject.Inject;
 +import com.google.inject.assistedinject.Assisted;
 +import com.netflix.astyanax.MutationBatch;
 +import org.apache.usergrid.persistence.collection.CollectionScope;
 +import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 +import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 +import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 +import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 +import org.apache.usergrid.persistence.core.task.Task;
 +import org.apache.usergrid.persistence.model.entity.Id;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import rx.Observable;
 +import rx.functions.Action1;
 +import rx.functions.Func1;
 +import rx.schedulers.Schedulers;
 +
 +import java.util.Set;
 +import java.util.UUID;
++import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 +
 +
 +/**
 + * Fires Cleanup Task
 + */
 +public class EntityDeletedTask implements Task<Void> {
 +    private static final Logger LOG =  LoggerFactory.getLogger(EntityDeletedTask.class);
 +
-     private EntityVersionCleanupFactory entityVersionCleanupFactory;
-     private MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-     private MvccEntitySerializationStrategy entitySerializationStrategy;
-     private Set<EntityDeleted> listeners;
++    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
++    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
++    private final MvccEntitySerializationStrategy entitySerializationStrategy;
++    private final Set<EntityDeleted> listeners;
 +    private final CollectionScope collectionScope;
 +    private final Id entityId;
 +    private final UUID version;
 +
 +
 +    @Inject
 +    public EntityDeletedTask( 
 +        EntityVersionCleanupFactory             entityVersionCleanupFactory,
 +        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-         final MvccEntitySerializationStrategy   entitySerializationStrategy,
++        @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
 +        final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
 +        @Assisted final CollectionScope         collectionScope, 
 +        @Assisted final Id                      entityId, 
 +        @Assisted final UUID                    version) {
 +
 +        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
 +        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
 +        this.entitySerializationStrategy = entitySerializationStrategy;
 +        this.listeners = listeners;
 +        this.collectionScope = collectionScope;
 +        this.entityId = entityId;
 +        this.version = version;
 +    }
 +
 +
 +    @Override
 +    public void exceptionThrown(Throwable throwable) {
 +        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
 +                new Object[] { collectionScope, entityId, version }, throwable );
 +    }
 +
 +    
 +    @Override
 +    public Void rejected() {
 +        try {
 +            call();
 +        }
 +        catch ( Exception e ) {
 +            throw new RuntimeException( "Exception thrown in call task", e );
 +        }
 +
 +        return null;
 +    }
 +
 +    
 +    @Override
 +    public Void call() throws Exception { 
 +
 +        entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
 +
 +        fireEvents();
 +        final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
 +        final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
 +        entityDelete.execute();
 +        logDelete.execute();
 +
 +        return null;
 +    }
 +
 +
 +    private void fireEvents() {
 +        final int listenerSize = listeners.size();
 +
 +        if ( listenerSize == 0 ) {
 +            return;
 +        }
 +
 +        if ( listenerSize == 1 ) {
 +            listeners.iterator().next().deleted( collectionScope, entityId,version );
 +            return;
 +        }
 +
 +        LOG.debug( "Started firing {} listeners", listenerSize );
 +
 +        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
 +        Observable.from(listeners)
 +                .parallel( new Func1<Observable<EntityDeleted>, Observable<EntityDeleted>>() {
 +
 +                    @Override
 +                    public Observable<EntityDeleted> call(
 +                            final Observable<EntityDeleted> entityVersionDeletedObservable ) {
 +
 +                        return entityVersionDeletedObservable.doOnNext( new Action1<EntityDeleted>() {
 +                            @Override
 +                            public void call( final EntityDeleted listener ) {
 +                                listener.deleted(collectionScope, entityId, version);
 +                            }
 +                        } );
 +                    }
 +                }, Schedulers.io() ).toBlocking().last();
 +
 +        LOG.debug( "Finished firing {} listeners", listenerSize );
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index d4bf6e6,422a155..efecdeb
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@@ -45,7 -22,6 +45,8 @@@ import org.apache.usergrid.persistence.
  import com.netflix.astyanax.Keyspace;
  import com.netflix.astyanax.MutationBatch;
  import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 +import java.util.Set;
++import org.apache.usergrid.persistence.core.guice.ProxyImpl;
  
  import rx.Observable;
  import rx.functions.Action1;
@@@ -75,17 -51,11 +76,17 @@@ public class EntityVersionCleanupTask i
      private final UUID version;
  
  
 -    public EntityVersionCleanupTask( final SerializationFig serializationFig,
 -                                     final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
 -                                     final MvccEntitySerializationStrategy entitySerializationStrategy,
 -                                     final Keyspace keyspace, final List<EntityVersionDeleted> listeners,
 -                                     final CollectionScope scope, final Id entityId, final UUID version ) {
 +    @Inject
 +    public EntityVersionCleanupTask( 
 +        final SerializationFig serializationFig,
 +        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-         final MvccEntitySerializationStrategy   entitySerializationStrategy,
++        @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
 +        final UniqueValueSerializationStrategy  uniqueValueSerializationStrategy,
 +        final Keyspace                          keyspace,
 +        final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
 +        @Assisted final CollectionScope         scope,
 +        @Assisted final Id                      entityId,
 +        @Assisted final UUID                    version ) {
  
          this.serializationFig = serializationFig;
          this.logEntrySerializationStrategy = logEntrySerializationStrategy;
@@@ -123,79 -93,66 +124,81 @@@
  
      @Override
      public Void call() throws Exception {
 -
 -
 -        final UUID maxVersion = version;
 -
 -
 -        Observable<MvccLogEntry> versions = Observable.create( new ObservableIterator( "versionIterators" ) {
 -            @Override
 -            protected Iterator getIterator() {
 -                return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
 -                        serializationFig.getBufferSize() );
 -            }
 -        } );
 -
 -
 -        //get the uuid from the version
 -        versions.map( new Func1<MvccLogEntry, UUID>() {
 -            @Override
 -            public UUID call( final MvccLogEntry mvccLogEntry ) {
 -                return mvccLogEntry.getVersion();
 -            }
 -        } )
 -                //buffer our versions
 -         .buffer( serializationFig.getBufferSize() )
 -         //for each buffer set, delete all of them
 -         .doOnNext( new Action1<List<UUID>>() {
 -            @Override
 -            public void call( final List<UUID> versions ) {
 -
 -                //Fire all the listeners
 -                fireEvents( versions );
 -
 -                MutationBatch entityBatch = keyspace.prepareMutationBatch();
 -                MutationBatch logBatch = keyspace.prepareMutationBatch();
 -
 -                for ( UUID version : versions ) {
 -                    final MutationBatch entityDelete = entitySerializationStrategy.delete( scope, entityId, version );
 -
 -                    entityBatch.mergeShallow( entityDelete );
 -
 -                    final MutationBatch logDelete = logEntrySerializationStrategy.delete( scope, entityId, version );
 -
 -                    logBatch.mergeShallow( logDelete );
 +        //TODO Refactor this logic into a a class that can be invoked from anywhere
 +        //load every entity we have history of
 +        Observable<List<MvccEntity>> deleteFieldsObservable =
 +            Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
 +                @Override
 +                protected Iterator<MvccEntity> getIterator() {
-                     Iterator<MvccEntity> entities =  entitySerializationStrategy
-                             .load(scope, entityId, version, serializationFig.getBufferSize());
++                    Iterator<MvccEntity> entities =  entitySerializationStrategy.loadDescendingHistory(
++                        scope, entityId, version, 1000); // TODO: what fetchsize should we use here?
 +                    return entities;
                  }
-             })       //buffer them for efficiency
++            })
++            //buffer them for efficiency
 +            .skip(1)
 +            .buffer(serializationFig.getBufferSize()).doOnNext(
 +            new Action1<List<MvccEntity>>() {
 +                @Override
 +                public void call(final List<MvccEntity> mvccEntities) {
 +                    final MutationBatch batch = keyspace.prepareMutationBatch();
 +                    final MutationBatch entityBatch = keyspace.prepareMutationBatch();
 +                    final MutationBatch logBatch = keyspace.prepareMutationBatch();
 +
 +                    for (MvccEntity mvccEntity : mvccEntities) {
 +                        if (!mvccEntity.getEntity().isPresent()) {
 +                            continue;
 +                        }
 +
 +                        final UUID entityVersion = mvccEntity.getVersion();
 +                        final Entity entity = mvccEntity.getEntity().get();
 +
 +                        //remove all unique fields from the index
 +                        for (final Field field : entity.getFields()) {
 +                            if (!field.isUnique()) {
 +                                continue;
 +                            }
 +                            final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
-                             final MutationBatch deleteMutation = uniqueValueSerializationStrategy.delete(scope,unique);
++                            final MutationBatch deleteMutation = 
++                                    uniqueValueSerializationStrategy.delete(scope,unique);
 +                            batch.mergeShallow(deleteMutation);
 +                        }
 +
 +                        final MutationBatch entityDelete = entitySerializationStrategy
 +                                .delete(scope, entityId, mvccEntity.getVersion());
 +                        entityBatch.mergeShallow(entityDelete);
 +                        final MutationBatch logDelete = logEntrySerializationStrategy
 +                                .delete(scope, entityId, version);
 +                        logBatch.mergeShallow(logDelete);
 +                    }
 +
 +                    try {
 +                        batch.execute();
 +                    } catch (ConnectionException e1) {
 +                        throw new RuntimeException("Unable to execute " +
 +                                "unique value " +
 +                                "delete", e1);
 +                    }
 +                    fireEvents(mvccEntities);
 +                    try {
 +                        entityBatch.execute();
 +                    } catch (ConnectionException e) {
 +                        throw new RuntimeException("Unable to delete entities in cleanup", e);
 +                    }
 +
 +                    try {
 +                        logBatch.execute();
 +                    } catch (ConnectionException e) {
 +                        throw new RuntimeException("Unable to delete entities from the log", e);
 +                    }
  
 -
 -                try {
 -                    entityBatch.execute();
 -                }
 -                catch ( ConnectionException e ) {
 -                    throw new RuntimeException( "Unable to delete entities in cleanup", e );
 -                }
 -
 -                try {
 -                    logBatch.execute();
 -                }
 -                catch ( ConnectionException e ) {
 -                    throw new RuntimeException( "Unable to delete entities from the log", e );
                  }
              }
 -        } ).count().toBlocking().last();
 +        );
 +
 +        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
 +
 +        logger.debug("Removed unique values for {} entities of entity {}",removedCount,entityId);
  
          return null;
      }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 994465d,e31aba5..baf2ac3
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@@ -19,8 -19,8 +19,6 @@@
  package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
  
  
--import java.util.Iterator;
--import java.util.List;
  import java.util.UUID;
  
  import org.slf4j.Logger;
@@@ -36,13 -36,14 +34,9 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
  import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
  import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
--import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
--import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
- import org.apache.usergrid.persistence.core.rx.ObservableIterator;
- import org.apache.usergrid.persistence.model.entity.Entity;
+ import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 -import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 -import org.apache.usergrid.persistence.model.entity.Entity;
  import org.apache.usergrid.persistence.model.entity.Id;
--import org.apache.usergrid.persistence.model.field.Field;
  
  import com.google.common.base.Preconditions;
  import com.google.inject.Inject;
@@@ -51,7 -52,7 +45,6 @@@ import com.netflix.astyanax.Keyspace
  import com.netflix.astyanax.MutationBatch;
  import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
  
--import rx.Observable;
  import rx.functions.Action1;
  
  
@@@ -119,72 -120,69 +112,70 @@@ public class MarkCommit implements Acti
          catch ( ConnectionException e ) {
              throw new RuntimeException( "Unable to mark entry as deleted" );
          }
- //<<<<<<< HEAD
- //=======
 -
 -
 -        //TODO Refactor this logic into a a class that can be invoked from anywhere
 -        //load every entity we have history of
 -        Observable<List<MvccEntity>> deleteFieldsObservable =
 -                Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
 -                    @Override
 -                    protected Iterator<MvccEntity> getIterator() {
 -                        Iterator<MvccEntity> entities =
 -                                entityStrat.loadDescendingHistory( collectionScope, entityId, entity.getVersion(), 100 );
 -
 -                        return entities;
 -                    }
 -                } )       //buffer them for efficiency
 -                          .buffer( serializationFig.getBufferSize() ).doOnNext(
 -
 -                        new Action1<List<MvccEntity>>() {
 -                            @Override
 -                            public void call( final List<MvccEntity> mvccEntities ) {
 -
 -
 -                                final MutationBatch batch = keyspace.prepareMutationBatch();
 -
 -                                for ( MvccEntity mvccEntity : mvccEntities ) {
 -                                    if ( !mvccEntity.getEntity().isPresent() ) {
 -                                        continue;
 -                                    }
 -
 -                                    final UUID entityVersion = mvccEntity.getVersion();
 -
 -                                    final Entity entity = mvccEntity.getEntity().get();
 -
 -                                    //remove all unique fields from the index
 -                                    for ( final Field field : entity.getFields() ) {
 -
 -                                        if(!field.isUnique()){
 -                                            continue;
 -                                        }
 -
 -                                        final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
 -
 -                                        final MutationBatch deleteMutation = uniqueValueStrat.delete(collectionScope,  unique );
 -
 -                                        batch.mergeShallow( deleteMutation );
 -                                    }
 -                                }
 -
 -                                try {
 -                                    batch.execute();
 -                                }
 -                                catch ( ConnectionException e1 ) {
 -                                    throw new RuntimeException( "Unable to execute " +
 -                                            "unique value " +
 -                                            "delete", e1 );
 -                                }
 -                            }
 -                        }
 -
 -
 -                                                                       );
 -
 -        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
 -
 -        LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
+     }
+ }
++
 +//
 +//
 +//        //TODO Refactor this logic into a a class that can be invoked from anywhere
 +//        //load every entity we have history of
 +//        Observable<List<MvccEntity>> deleteFieldsObservable =
 +//                Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
 +//                    @Override
 +//                    protected Iterator<MvccEntity> getIterator() {
 +//                        Iterator<MvccEntity> entities =
 +//                                entityStrat.load( collectionScope, entityId, entity.getVersion(), 100 );
 +//
 +//                        return entities;
 +//                    }
 +//                } )       //buffer them for efficiency
 +//                          .buffer( serializationFig.getBufferSize() ).doOnNext(
 +//
 +//                        new Action1<List<MvccEntity>>() {
 +//                            @Override
 +//                            public void call( final List<MvccEntity> mvccEntities ) {
 +//
 +//
 +//                                final MutationBatch batch = keyspace.prepareMutationBatch();
 +//
 +//                                for ( MvccEntity mvccEntity : mvccEntities ) {
 +//                                    if ( !mvccEntity.getEntity().isPresent() ) {
 +//                                        continue;
 +//                                    }
 +//
 +//                                    final UUID entityVersion = mvccEntity.getVersion();
 +//
 +//                                    final Entity entity = mvccEntity.getEntity().get();
 +//
 +//                                    //remove all unique fields from the index
 +//                                    for ( final Field field : entity.getFields() ) {
 +//
 +//                                        if(!field.isUnique()){
 +//                                            continue;
 +//                                        }
 +//
 +//                                        final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
 +//
 +//                                        final MutationBatch deleteMutation = uniqueValueStrat.delete(collectionScope,  unique );
 +//
 +//                                        batch.mergeShallow( deleteMutation );
 +//                                    }
 +//                                }
 +//
 +//                                try {
 +//                                    batch.execute();
 +//                                }
 +//                                catch ( ConnectionException e1 ) {
 +//                                    throw new RuntimeException( "Unable to execute " +
 +//                                            "unique value " +
 +//                                            "delete", e1 );
 +//                                }
 +//                            }
 +//                        }
 +//
 +//
 +//                                                                       );
 +//
 +//        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
 +//
 +//        LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
- //>>>>>>> befcdcab6f7f1c83dbcb2a24eddb055c9297d59f
-     }
- }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index 73f1b6d,e2e6876..dd3c013
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@@ -21,7 -21,12 +21,8 @@@ package org.apache.usergrid.persistence
  
  
  import org.apache.usergrid.persistence.core.guice.CommonModule;
+ import org.apache.usergrid.persistence.core.guice.MaxMigrationModule;
 -import org.apache.usergrid.persistence.core.guice.MaxMigrationVersion;
  import org.apache.usergrid.persistence.core.guice.TestModule;
 -import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 -
 -import com.google.inject.multibindings.Multibinder;
  
  
  public class TestCollectionModule extends TestModule {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index eae1c54,1fce6e2..d0a87c3
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@@ -139,31 -119,22 +139,32 @@@ public class EntityVersionCleanupTaskTe
          final MutationBatch newBatch = mock( MutationBatch.class );
  
  
 -        //set up returning a mutator
 -        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
 +        // set up returning a mutator
 +        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                  .thenReturn( newBatch );
  
 -
 -        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
 +        when(less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                  .thenReturn( newBatch );
  
 +        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
  
 -        //start the task
 -        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
  
 -        //wait for the task
 -        future.get();
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
  
-         when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 -        //verify it was run
++        when( ess.loadDescendingHistory(
++                same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 +                .thenReturn(mel.iterator() );
 +
 +        try {
 +            cleanupTask.call();
 +        }catch(Exception e){
 +            Assert.fail( e.getMessage() );
 +        }
 +
 +        // verify it was run
          verify( entityBatch ).execute();
  
          verify( logBatch ).execute();
@@@ -239,38 -195,24 +240,38 @@@
  
  
          //set up returning a mutator
 -        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
 +        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                  .thenReturn( batch );
  
 -
 -        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
 +        when( mvccLogEntrySerializationStrategy
 +                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                  .thenReturn( batch );
  
 +        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
  
 -        //start the task
 -        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
  
 -        //wait for the task
 -        future.get();
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
  
-         when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 -        //verify it was run
 -        verify( entityBatch, never() ).execute();
++        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 +                .thenReturn(mel.iterator() );
  
 -        verify( logBatch, never() ).execute();
 +        //start the task
 +        try {
 +            cleanupTask.call();
 +        }catch(Exception e){
 +            Assert.fail( e.getMessage() );
 +        }
 +
 +
 +        // These last two verify statements do not make sense. We cannot assert that the entity
 +        // and log batches are never called. Even if there are no listeners the entity delete 
 +        // cleanup task will still run to do the normal cleanup.
 +        //
 +        // verify( entityBatch, never() ).execute();
 +        // verify( logBatch, never() ).execute();
      }
  
  
@@@ -358,23 -283,11 +359,23 @@@
                  .thenReturn( batch );
  
  
 -        //start the task
 -        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 +        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
  
 -        //wait for the task
 -        future.get();
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 +
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 +
-         when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
++        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 +                .thenReturn(mel.iterator() );
 +
 +
 +        try {
 +            cleanupTask.call();
 +        }catch(Exception e){
 +            Assert.fail( e.getMessage() );
 +        }
  
          //we deleted the version
          //verify it was run
@@@ -462,36 -366,27 +463,36 @@@
  
  
          //set up returning a mutator
 -        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
 +        when( ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                  .thenReturn( batch );
  
 -
 -        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
 +        when( mvccLogEntrySerializationStrategy
 +                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                  .thenReturn( batch );
  
 +        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
  
 -        //start the task
 -        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 +        Entity entity = new Entity( entityId );
  
 -        //wait for the task
 -        future.get();
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.of(entity)) );
  
 -        //we deleted the version
 -        //verify we deleted everything
 -        verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
 +                MvccEntity.Status.DELETED, Optional.of(entity)) );
  
-         when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 -        verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
++        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 +                .thenReturn(mel.iterator() );
  
 +        try {
 +            cleanupTask.call();
 +        }catch(Exception e){
 +            Assert.fail( e.getMessage() );
 +        }
 +        //we deleted the version
 +        //verify we deleted everything
 +        verify( entityBatch, times( 1 ) ).mergeShallow( any( MutationBatch.class ) );
  
 +        verify( logBatch, times( 1 ) ).mergeShallow( any( MutationBatch.class ) );
  
          verify( logBatch ).execute();
  
@@@ -729,29 -608,41 +730,29 @@@
                  .thenReturn( batch );
  
  
 -        //start the task
 -        ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
 -
 -        //now start another task while the slow running task is running
 -        ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
 +        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
  
 -        //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
 -        future2.get();
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
  
 -        /**
 -         * While we're not done, release latches every 200 ms
 -         */
 -        while ( !future1.isDone() ) {
 -            Thread.sleep( 200 );
 -            waitSemaphore.release( listenerCount );
 -        }
 +        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
 +                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
  
-         when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 -        //wait for the task
 -        future1.get();
++        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
 +                .thenReturn(mel.iterator() );
  
 -        //we deleted the version
 -        //verify we deleted everything
  
 +        try {
 +            cleanupTask.rejected();
 +        }catch(Exception e){
 +            Assert.fail(e.getMessage());
 +        }
  
          //we deleted the version
 -        //verify we deleted everything
 -        verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
 -
 -        verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
 -
 -
 -        verify( logBatch, times(2) ).execute();
 -
 -        verify( entityBatch, times(2) ).execute();
 +        //verify it was run
 +        verify( entityBatch ).execute();
  
 +        verify( logBatch ).execute();
  
          //the latch was executed
          latch.await();