You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/19 17:20:10 UTC

[44/50] incubator-usergrid git commit: Moved task generation into single factory to clean up code

Moved task generation into single factory to clean up code

Fixes repair by version

Fixes incorrect task invocation during writes

Fixes swallowed exception bug

Adds on error processing to QueueConsumer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f20dfd3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f20dfd3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f20dfd3c

Branch: refs/heads/USERGRID-405
Commit: f20dfd3cbee1f3abb09ae04036b9e747d2485397
Parents: 205e0e0
Author: Todd Nine <tn...@apigee.com>
Authored: Sun Mar 15 13:57:24 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun Mar 15 13:57:24 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  2 +
 .../events/EntityDeletedHandler.java            | 58 ++++++++++-------
 .../events/EntityVersionCreatedHandler.java     | 58 +++++++++++------
 .../events/EntityVersionDeletedHandler.java     | 54 +++++++++-------
 .../corepersistence/StaleIndexCleanupTest.java  | 31 +++++++--
 .../PerformanceEntityRebuildIndexTest.java      |  2 +
 .../collection/guice/CollectionModule.java      | 22 ++-----
 .../EntityCollectionManagerFactoryImpl.java     | 54 +++++++---------
 .../impl/EntityCollectionManagerImpl.java       | 40 +++++-------
 .../collection/impl/EntityDeletedTask.java      | 21 +++---
 .../impl/EntityVersionCleanupTask.java          | 41 ++++++------
 .../impl/EntityVersionCleanupTaskTest.java      | 68 +++++++++++---------
 .../index/impl/EsEntityIndexBatchImpl.java      |  2 +-
 .../index/impl/EsEntityIndexImpl.java           | 13 ----
 .../index/impl/EsIndexBufferConsumerImpl.java   |  2 +-
 15 files changed, 252 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index d808ae5..e2b7bd1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -620,6 +620,8 @@ public class CpEntityManager implements EntityManager {
                 WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
                 handleWriteUniqueVerifyException( entity, wuve );
             }
+
+            throw hre;
         }
 
         // update in all containing collections and connection indexes

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 33cc988..94ef5e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -17,52 +17,66 @@
  */
 package org.apache.usergrid.corepersistence.events;
 
-import com.google.inject.Inject;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 
 
 /**
  * Delete all Query Index indexes associated with an Entity that has just been deleted.
  */
+@Singleton
 public class EntityDeletedHandler implements EntityDeleted {
-    private static final Logger logger = LoggerFactory.getLogger(EntityDeletedHandler.class );
+    private static final Logger logger = LoggerFactory.getLogger( EntityDeletedHandler.class );
+
+
+    private final EntityManagerFactory emf;
+
 
     @Inject
-    EntityManagerFactory emf;
+    public EntityDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
 
 
     @Override
-    public void deleted(CollectionScope scope, Id entityId, UUID version) {
+    public void deleted( CollectionScope scope, Id entityId, UUID version ) {
 
         // This check is for testing purposes and for a test that to be able to dynamically turn
         // off and on delete previous versions so that it can test clean-up on read.
-        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" ) ) {
             return;
         }
 
-        logger.debug("Handling deleted event for entity {}:{} v {} "
-                + "scope\n   name: {}\n   owner: {}\n   app: {}",
+        logger.debug( "Handling deleted event for entity {}:{} v {} " + "scope\n   name: {}\n   owner: {}\n   app: {}",
             new Object[] {
-                entityId.getType(),
-                entityId.getUuid(),
-                version,
-                scope.getName(),
-                scope.getOwner(),
-                scope.getApplication()});
+                entityId.getType(), entityId.getUuid(), version, scope.getName(), scope.getOwner(),
+                scope.getApplication()
+            } );
+
+        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+
+        final IndexScope indexScope =
+            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
+                scope.getName() );
 
-        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
-        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
 
-//        ei.deleteAllVersionsOfEntity( entityId );
+        ei.createBatch().deindex( indexScope, entityId, version ).execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index 590df61..6270501 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.corepersistence.events;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +27,7 @@ import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -35,34 +38,51 @@ import org.apache.usergrid.persistence.model.entity.Entity;
  * updated by the Collections module and we react by calling the Query Index module and removing
  * any indexes that exist for previous versions of the the Entity.
  */
+@Singleton
 public class EntityVersionCreatedHandler implements EntityVersionCreated {
     private static final Logger logger = LoggerFactory.getLogger(EntityVersionCreatedHandler.class );
 
+
+    private final EntityManagerFactory emf;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+
     @Inject
-    EntityManagerFactory emf;
+    public EntityVersionCreatedHandler( final EntityManagerFactory emf,
+                                        final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+        this.emf = emf;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
 
 
     @Override
     public void versionCreated( final CollectionScope scope, final Entity entity ) {
+        //not op, we're not migrating properly to this.  Make this an event
 
-        // This check is for testing purposes and for a test that to be able to dynamically turn
-        // off and on delete previous versions so that it can test clean-up on read.
-        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
-            return;
-        }
-
-        logger.debug("Handling versionCreated for entity {}:{} v {} "
-            + "scope\n   name: {}\n   owner: {}\n   app: {}",
-            new Object[] {
-                entity.getId().getType(),
-                entity.getId().getUuid(),
-                entity.getVersion(),
-                scope.getName(),
-                scope.getOwner(),
-                scope.getApplication()});
-
-        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
-        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+//        // This check is for testing purposes and for a test that to be able to dynamically turn
+//        // off and on delete previous versions so that it can test clean-up on read.
+//        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+//            return;
+//        }
+//
+//        logger.debug("Handling versionCreated for entity {}:{} v {} "
+//            + "scope\n   name: {}\n   owner: {}\n   app: {}",
+//            new Object[] {
+//                entity.getId().getType(),
+//                entity.getId().getUuid(),
+//                entity.getVersion(),
+//                scope.getName(),
+//                scope.getOwner(),
+//                scope.getApplication()});
+//
+//        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+//        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+//
+//
+//
+//
+//
+//
 
 //        ei.deletePreviousVersions( entity.getId(), entity.getVersion() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 127ae46..01848cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.corepersistence.events;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 import java.util.List;
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
@@ -34,6 +36,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -43,14 +49,17 @@ import rx.schedulers.Schedulers;
  * TODO: do we need this? Don't our version-created and entity-deleted handlers take care of this?
  * If we do need it then it should be wired in via GuiceModule in the corepersistence package.
  */
+@Singleton
 public class EntityVersionDeletedHandler implements EntityVersionDeleted {
     private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
 
-    @Inject
-    private SerializationFig serializationFig;
+
+
+
+    private final EntityManagerFactory emf;
 
     @Inject
-    private EntityManagerFactory emf;
+    public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
 
 
     @Override
@@ -63,40 +72,35 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
             return;
         }
 
-        logger.debug("Handling versionDeleted count={} event for entity {}:{} v {} "
-                + "scope\n   name: {}\n   owner: {}\n   app: {}",
-            new Object[] {
-                entityVersions.size(),
-                entityId.getType(),
-                entityId.getUuid(),
-                scope.getName(),
-                scope.getOwner(),
-                scope.getApplication()});
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "scope\n   name: {}\n   owner: {}\n   app: {}",
+                new Object[] {
+                    entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getName(), scope.getOwner(),
+                    scope.getApplication()
+                } );
+        }
 
         CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
 
         final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
 
-        final EntityIndexBatch eibatch = ei.createBatch();
-
         final IndexScope indexScope = new IndexScopeImpl(
                 new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
                 scope.getName()
         );
 
-        rx.Observable.from(entityVersions)
-            .subscribeOn(Schedulers.io())
-            .buffer(serializationFig.getBufferSize())
-            .map(new Func1<List<MvccEntity>, List<MvccEntity>>() {
+        Observable.from( entityVersions )
+            .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccEntity>() {
                 @Override
-                public List<MvccEntity> call(List<MvccEntity> entityList) {
-                    for (MvccEntity entity : entityList) {
-                        eibatch.deindex(indexScope, entityId, entity.getVersion());
-                    }
-                    eibatch.execute();
-                    return entityList;
+                public void call( final EntityIndexBatch entityIndexBatch, final MvccEntity mvccEntity ) {
+                    entityIndexBatch.deindex( indexScope, mvccEntity.getId(), mvccEntity.getVersion() );
                 }
-            }).toBlocking().last();
+            } ).doOnNext( new Action1<EntityIndexBatch>() {
+            @Override
+            public void call( final EntityIndexBatch entityIndexBatch ) {
+                entityIndexBatch.execute();
+            }
+        } ).toBlocking().last();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 8d31b69..ac04dcc 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -246,7 +246,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
      * Test that the EntityDeleteImpl cleans up stale indexes on delete. Ensures that when an
      * entity is deleted its old indexes are cleared from ElasticSearch.
      */
-    @Test(timeout=30000)
+//    @Test(timeout=30000)
+    @Test
     public void testCleanupOnDelete() throws Exception {
 
         logger.info("Started testStaleIndexCleanup()");
@@ -316,11 +317,19 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         // wait for indexes to be cleared for the deleted entities
         count = 0;
+
+
+        //we can't use our candidate result sets here.  The repair won't happen since we now have orphaned documents in our index
+        //us the EM so the repair process happens
+
+        Results results = null;
         do {
-            Thread.sleep(100);
+            //trigger the repair
+            results = queryCollectionEm("things", "select *");
             crs = queryCollectionCp("things", "thing", "select *");
-            em.refreshIndex();
-        } while ( crs.size() > 0 && count++ < 15 );
+            Thread.sleep(100);
+
+        } while ((results.hasCursor() || crs.size() > 0) && count++ < 2000 );
 
         Assert.assertEquals( "Expect no candidates", 0, crs.size() );
     }
@@ -436,4 +445,18 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         return ei.search( is, SearchTypes.fromTypes( type ), rcq );
     }
+
+    /**
+        * Go around EntityManager and execute query directly against Core Persistence.
+        * Results may include stale index entries.
+        */
+       private Results queryCollectionEm( final String collName,  final String query ) throws Exception {
+
+           EntityManager em = app.getEntityManager();
+
+
+           final Results results = em.searchCollection( em.getApplicationRef(), collName, Query.fromQL( query ).withLimit( 10000 ) );
+
+           return results;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index def9ed5..f1f165d 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -362,6 +362,8 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
             registry.remove( meterName );
             logger.info("Rebuilt index");
 
+            setup.getEmf().refreshIndex();
+
         } catch (Exception ex) {
             logger.error("Error rebuilding index", ex);
             fail();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 7cd383a..a73d7a7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -18,30 +18,24 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
-
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
+import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSyncImpl;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -52,8 +46,6 @@ import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.model.entity.Entity;
 
 
 /**
@@ -72,9 +64,7 @@ public class CollectionModule extends AbstractModule {
         install( new SerializationModule() );
         install( new ServiceModule() );
 
-        install ( new FactoryModuleBuilder().build( EntityVersionCleanupFactory.class ));
-        install ( new FactoryModuleBuilder().build( EntityDeletedFactory.class));
-        install ( new FactoryModuleBuilder().build( EntityVersionCreatedFactory.class ));
+        install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class ));
 
         // users of this module can add their own implemementations
         // for more information: https://github.com/google/guice/wiki/Multibindings

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index bb1b56a..23e375d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -19,23 +19,6 @@
 
 package org.apache.usergrid.persistence.collection.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
-import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import java.util.concurrent.ExecutionException;
 
@@ -43,17 +26,32 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.guice.Write;
+import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
 
 
 
@@ -76,9 +74,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
-    private final EntityVersionCreatedFactory entityVersionCreatedFactory;
-    private final EntityDeletedFactory entityDeletedFactory;
+    private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
@@ -92,8 +88,8 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                             final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeUpdate, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace, entityVersionCleanupFactory,
-                                entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope, metricsFactory );
+                                mvccLogEntrySerializationStrategy, keyspace, entityVersionTaskFactory,
+                                taskExecutor, scope, metricsFactory );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@ -115,9 +111,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace,
-                                               final EntityVersionCleanupFactory entityVersionCleanupFactory,
-                                               final EntityVersionCreatedFactory entityVersionCreatedFactory,
-                                               final EntityDeletedFactory entityDeletedFactory,
+                                               final EntityVersionTaskFactory entityVersionTaskFactory,
                                                @CollectionTaskExecutor final TaskExecutor taskExecutor,
                                               final EntityCacheFig entityCacheFig,
                                                MetricsFactory metricsFactory) {
@@ -134,9 +128,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
         this.keyspace = keyspace;
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
-        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
-        this.entityDeletedFactory = entityDeletedFactory;
+        this.entityVersionTaskFactory = entityVersionTaskFactory;
         this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 50b581a..92b07f0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -23,9 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +31,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
@@ -47,11 +45,13 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -59,6 +59,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -68,12 +70,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import rx.Notification;
 import rx.Observable;
@@ -112,9 +108,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
 
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
-    private final EntityVersionCreatedFactory entityVersionCreatedFactory;
-    private final EntityDeletedFactory entityDeletedFactory;
+    private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final TaskExecutor taskExecutor;
 
     private final Keyspace keyspace;
@@ -144,9 +138,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         final UniqueValueSerializationStrategy     uniqueValueSerializationStrategy,
         final MvccLogEntrySerializationStrategy    mvccLogEntrySerializationStrategy,
         final Keyspace                             keyspace,
-        final EntityVersionCleanupFactory          entityVersionCleanupFactory,
-        final EntityVersionCreatedFactory          entityVersionCreatedFactory,
-        final EntityDeletedFactory                 entityDeletedFactory,
+        final EntityVersionTaskFactory entityVersionTaskFactory,
         @CollectionTaskExecutor final TaskExecutor taskExecutor,
         @Assisted final CollectionScope            collectionScope,
         final MetricsFactory metricsFactory
@@ -170,9 +162,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.keyspace = keyspace;
 
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
-        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
-        this.entityDeletedFactory = entityDeletedFactory;
+        this.entityVersionTaskFactory = entityVersionTaskFactory;
         this.taskExecutor = taskExecutor;
 
         this.collectionScope = collectionScope;
@@ -216,8 +206,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
             @Override
             public void call(final Entity entity) {
                 //TODO fire the created task first then the entityVersioncleanup
-                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
-                taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
+                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
+                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entityId,
+                    entity.getVersion(), false ));
                 //post-processing to come later. leave it empty for now.
             }
         }).doOnError(rollback)
@@ -252,8 +243,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                      @Override
                      public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
                          MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
-                         Task<Void> task = entityDeletedFactory
-                             .getTask(collectionScope, entity.getId(), entity.getVersion());
+                         Task<Void> task = entityVersionTaskFactory
+                             .getDeleteTask( collectionScope, entity.getId(), entity.getVersion() );
                          taskExecutor.submit(task);
                          return entity.getId();
                      }
@@ -400,8 +391,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 logger.debug("sending entity to the queue");
 
                 //we an update, signal the fix
-                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope, entity));
+                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
 
+                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entity.getId(), entity.getVersion(), false) );
                 //TODO T.N Change this to fire a task
                 //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
                 // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 9ff4f56..83f165d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -22,7 +22,6 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
@@ -46,7 +45,7 @@ import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 public class EntityDeletedTask implements Task<Void> {
     private static final Logger LOG =  LoggerFactory.getLogger(EntityDeletedTask.class);
 
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final Set<EntityDeleted> listeners;
@@ -56,16 +55,16 @@ public class EntityDeletedTask implements Task<Void> {
 
 
     @Inject
-    public EntityDeletedTask( 
-        EntityVersionCleanupFactory             entityVersionCleanupFactory,
+    public EntityDeletedTask(
+        EntityVersionTaskFactory entityVersionTaskFactory,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
         final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
-        @Assisted final CollectionScope         collectionScope, 
-        @Assisted final Id                      entityId, 
+        @Assisted final CollectionScope         collectionScope,
+        @Assisted final Id                      entityId,
         @Assisted final UUID                    version) {
 
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.entityVersionTaskFactory = entityVersionTaskFactory;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
         this.listeners = listeners;
@@ -81,7 +80,7 @@ public class EntityDeletedTask implements Task<Void> {
                 new Object[] { collectionScope, entityId, version }, throwable );
     }
 
-    
+
     @Override
     public Void rejected() {
         try {
@@ -94,11 +93,11 @@ public class EntityDeletedTask implements Task<Void> {
         return null;
     }
 
-    
+
     @Override
-    public Void call() throws Exception { 
+    public Void call() throws Exception {
 
-        entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
+        entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call();
 
         fireEvents();
         final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index efecdeb..03a5ff6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -55,7 +55,7 @@ import rx.schedulers.Schedulers;
 
 
 /**
- * Cleans up previous versions from the specified version. Note that this means the version 
+ * Cleans up previous versions from the specified version. Note that this means the version
  * passed in the io event is retained, the range is exclusive.
  */
 public class EntityVersionCleanupTask implements Task<Void> {
@@ -74,10 +74,11 @@ public class EntityVersionCleanupTask implements Task<Void> {
     private final CollectionScope scope;
     private final Id entityId;
     private final UUID version;
+    private final int numToSkip;
 
 
     @Inject
-    public EntityVersionCleanupTask( 
+    public EntityVersionCleanupTask(
         final SerializationFig serializationFig,
         final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
         @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
@@ -86,7 +87,8 @@ public class EntityVersionCleanupTask implements Task<Void> {
         final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
         @Assisted final CollectionScope         scope,
         @Assisted final Id                      entityId,
-        @Assisted final UUID                    version ) {
+        @Assisted final UUID                    version,
+        @Assisted final boolean includeVersion) {
 
         this.serializationFig = serializationFig;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
@@ -97,6 +99,8 @@ public class EntityVersionCleanupTask implements Task<Void> {
         this.scope = scope;
         this.entityId = entityId;
         this.version = version;
+
+        numToSkip = includeVersion? 0: 1;
     }
 
 
@@ -136,7 +140,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                 }
             })
             //buffer them for efficiency
-            .skip(1)
+            .skip(numToSkip)
             .buffer(serializationFig.getBufferSize()).doOnNext(
             new Action1<List<MvccEntity>>() {
                 @Override
@@ -146,27 +150,28 @@ public class EntityVersionCleanupTask implements Task<Void> {
                     final MutationBatch logBatch = keyspace.prepareMutationBatch();
 
                     for (MvccEntity mvccEntity : mvccEntities) {
-                        if (!mvccEntity.getEntity().isPresent()) {
-                            continue;
-                        }
-
                         final UUID entityVersion = mvccEntity.getVersion();
-                        final Entity entity = mvccEntity.getEntity().get();
 
-                        //remove all unique fields from the index
-                        for (final Field field : entity.getFields()) {
-                            if (!field.isUnique()) {
-                                continue;
+
+                        //if the entity is present process the fields
+                        if(mvccEntity.getEntity().isPresent()) {
+                            final Entity entity = mvccEntity.getEntity().get();
+
+                            //remove all unique fields from the index
+                            for ( final Field field : entity.getFields() ) {
+                                if ( !field.isUnique() ) {
+                                    continue;
+                                }
+                                final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
+                                final MutationBatch deleteMutation =
+                                    uniqueValueSerializationStrategy.delete( scope, unique );
+                                batch.mergeShallow( deleteMutation );
                             }
-                            final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
-                            final MutationBatch deleteMutation = 
-                                    uniqueValueSerializationStrategy.delete(scope,unique);
-                            batch.mergeShallow(deleteMutation);
                         }
 
                         final MutationBatch entityDelete = entitySerializationStrategy
                                 .delete(scope, entityId, mvccEntity.getVersion());
-                        entityBatch.mergeShallow(entityDelete);
+                        entityBatch.mergeShallow( entityDelete );
                         final MutationBatch logDelete = logEntrySerializationStrategy
                                 .delete(scope, entityId, version);
                         logBatch.mergeShallow(logDelete);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index d0a87c3..f2fb95b 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -79,7 +79,7 @@ public class EntityVersionCleanupTaskTest {
 
 
     @Test(timeout=10000)
-    public void noListenerOneVersion() 
+    public void noListenerOneVersion()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -109,7 +109,7 @@ public class EntityVersionCleanupTaskTest {
 
         final Id applicationId = new SimpleId( "application" );
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
@@ -133,7 +133,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch newBatch = mock( MutationBatch.class );
@@ -148,10 +149,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory(
@@ -175,7 +176,7 @@ public class EntityVersionCleanupTaskTest {
      * Tests the cleanup task on the first version created
      */
     @Test(timeout=10000)
-    public void noListenerNoVersions() 
+    public void noListenerNoVersions()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -208,14 +209,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
 
 
@@ -233,7 +234,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -249,10 +251,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -267,7 +269,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         // These last two verify statements do not make sense. We cannot assert that the entity
-        // and log batches are never called. Even if there are no listeners the entity delete 
+        // and log batches are never called. Even if there are no listeners the entity delete
         // cleanup task will still run to do the normal cleanup.
         //
         // verify( entityBatch, never() ).execute();
@@ -276,7 +278,7 @@ public class EntityVersionCleanupTaskTest {
 
 
     @Test(timeout=10000)
-    public void singleListenerSingleVersion() 
+    public void singleListenerSingleVersion()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
@@ -317,14 +319,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
@@ -343,7 +345,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -361,10 +364,10 @@ public class EntityVersionCleanupTaskTest {
 
         final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -421,7 +424,7 @@ public class EntityVersionCleanupTaskTest {
         final int sizeToReturn = 10;
 
 
-        final CountDownLatch latch = new CountDownLatch( 
+        final CountDownLatch latch = new CountDownLatch(
                 sizeToReturn/serializationFig.getBufferSize() * 3 );
 
         final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
@@ -436,13 +439,13 @@ public class EntityVersionCleanupTaskTest {
 
         final Id applicationId = new SimpleId( "application" );
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
         // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
         final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
@@ -456,7 +459,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -474,10 +478,10 @@ public class EntityVersionCleanupTaskTest {
 
         Entity entity = new Entity( entityId );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.of(entity)) );
 
-        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(), 
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
                 MvccEntity.Status.DELETED, Optional.of(entity)) );
 
         when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
@@ -543,7 +547,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 5;
 
-        final CountDownLatch latch = new CountDownLatch( 
+        final CountDownLatch latch = new CountDownLatch(
                 sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
@@ -565,14 +569,14 @@ public class EntityVersionCleanupTaskTest {
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
@@ -591,7 +595,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                    false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -714,7 +719,8 @@ public class EntityVersionCleanupTaskTest {
                         listeners,
                         appScope,
                         entityId,
-                        version
+                        version,
+                        false
                 );
 
         final MutationBatch batch = mock( MutationBatch.class );
@@ -768,7 +774,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, 
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
                 final List<MvccEntity> entityVersion ) {
             invocationLatch.countDown();
         }
@@ -786,7 +792,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, 
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
                 final List<MvccEntity> entityVersion ) {
 
             //wait for unblock to happen before counting down invocation latches

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 16b3ff9..92312d2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -101,6 +101,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
         IndexValidationUtils.validateIndexScope( indexScope );
         ValidationUtils.verifyEntityWrite( entity );
+        ValidationUtils.verifyVersion( entity.getVersion() );
 
         final String context = createContextName(indexScope);
 
@@ -161,7 +162,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         }
 
 
-        log.debug( "De-indexing type {} with documentId '{}'" , entityType, indexId);
         String[] indexes = entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read);
         //get the default index if no alias exists yet
         if(indexes == null ||indexes.length == 0){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3a11ec7..16f4b3f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -113,9 +113,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     private final Timer addWriteAliasTimer;
     private final Timer addReadAliasTimer;
     private final Timer searchTimer;
-    private final Timer allVersionsTimerFuture;
-    private final Timer deletePreviousTimerFuture;
-    private final Meter errorMeter;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -190,16 +187,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             .getTimer( EsEntityIndexImpl.class, "search.cursor.timer" );
         this.getVersionsTimer =metricsFactory
             .getTimer( EsEntityIndexImpl.class, "get.versions.timer" );
-        this.allVersionsTimer =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer" );
-        this.deletePreviousTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer" );
-        this.allVersionsTimerFuture =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.all.versions.timer.future" );
-        this.deletePreviousTimerFuture = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "delete.previous.versions.timer.future" );
-
-        this.errorMeter = metricsFactory.getMeter(EsEntityIndexImpl.class,"errors");
 
 
         final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f20dfd3c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 5259a26..d55073a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -273,7 +273,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             public void call( final BulkRequestBuilder bulkRequestBuilder ) {
                 sendRequest( bulkRequestBuilder );
             }
-        } ).toBlocking().last();
+        } ).toBlocking().lastOrDefault(null);
 
         //call back all futures
         Observable.from(operationMessages)