You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/26 01:54:21 UTC

[2/2] incubator-usergrid git commit: Updated tests for tasks. Also need to fix issues with impl tests testing the wrong versions.

Updated tests for tasks.  Also need to fix issues with impl tests testing the wrong versions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/787bc091
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/787bc091
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/787bc091

Branch: refs/heads/USERGRID-405
Commit: 787bc09138bda5b793dd57cb7494111be569913a
Parents: 49fa812
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 25 17:54:19 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 25 17:54:19 2015 -0700

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |    7 -
 .../cache/CachedEntityCollectionManager.java    |    7 -
 .../impl/EntityCollectionManagerImpl.java       |   39 -
 .../impl/EntityVersionCleanupTask.java          |  204 ++-
 .../UniqueValueSerializationStrategyImpl.java   |   11 +-
 .../collection/EntityCollectionManagerIT.java   |  100 +-
 .../impl/EntityVersionCleanupTaskTest.java      | 1389 +++++++++---------
 .../impl/LogEntryIteratorTest.java              |    3 +-
 .../collection/util/LogEntryMock.java           |   39 +-
 .../collection/util/UniqueValueEntryMock.java   |  161 ++
 .../collection/util/VersionGenerator.java       |   55 +
 11 files changed, 1006 insertions(+), 1009 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 90cade0..a6331a9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -69,13 +69,6 @@ public interface EntityCollectionManager extends CPManager {
     public Observable<EntitySet> load(Collection<Id> entityIds);
 
     /**
-     * Takes the change and reloads an entity with all changes applied in this entity applied.
-     * The resulting entity from calling load will be the previous version of this entity plus
-     * the entity in this object applied to it.
-     */
-    public Observable<Entity> update ( Entity entity );
-
-    /**
      * Returns health of entity data store.
      */
     public Health getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index 186aafa..c43e309 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -122,13 +122,6 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
         return targetEntityCollectionManager.load( entityIds );
     }
 
-
-    @Override
-    public Observable<Entity> update( final Entity entity ) {
-        return targetEntityCollectionManager.update( entity ).doOnNext( cacheAdd );
-    }
-
-
     @Override
     public Health getHealth() {
         return targetEntityCollectionManager.getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 6018e6f..e483cc1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -290,45 +290,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         } );
     }
 
-
-    @Override
-    public Observable<Entity> update( final Entity entity ) {
-
-        logger.debug( "Starting update process" );
-
-        //do our input validation
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final Id entityId = entity.getId();
-
-
-        ValidationUtils.verifyIdentity( entityId );
-
-        // create our observable and start the write
-        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
-
-
-        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
-
-
-        return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
-            @Override
-            public void call( final Entity entity ) {
-                logger.debug( "sending entity to the queue" );
-
-                //we an update, signal the fix
-                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
-
-                //TODO T.N Change this to fire a task
-                //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
-                // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
-
-
-            }
-        } ).doOnError( rollback );
-    }
-
-
     // fire the stages
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 8bfddf4..7c39ef4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -20,48 +20,40 @@ package org.apache.usergrid.persistence.collection.impl;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import java.util.Set;
-
-import org.apache.commons.lang.NotImplementedException;
-
-import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 
 import rx.Observable;
-import rx.Subscriber;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.observables.BlockingObservable;
 import rx.schedulers.Schedulers;
 
 
 /**
- * Cleans up previous versions from the specified version. Note that this means the version
- * passed in the io event is retained, the range is exclusive.
+ * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
+ * retained, the range is exclusive.
  */
 public class EntityVersionCleanupTask implements Task<Void> {
 
@@ -70,7 +62,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
     private final Set<EntityVersionDeleted> listeners;
 
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final Keyspace keyspace;
 
@@ -82,20 +73,16 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
 
     @Inject
-    public EntityVersionCleanupTask(
-        final SerializationFig serializationFig,
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-        @ProxyImpl final MvccEntitySerializationStrategy   entitySerializationStrategy,
-        final UniqueValueSerializationStrategy  uniqueValueSerializationStrategy,
-        final Keyspace                          keyspace,
-        final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
-        @Assisted final CollectionScope         scope,
-        @Assisted final Id                      entityId,
-        @Assisted final UUID                    version ) {
+    public EntityVersionCleanupTask( final SerializationFig serializationFig,
+                                     final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                     final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                     final Keyspace keyspace, final Set<EntityVersionDeleted> listeners,
+                                     // MUST be a set or Guice will not inject
+                                     @Assisted final CollectionScope scope, @Assisted final Id entityId,
+                                     @Assisted final UUID version ) {
 
         this.serializationFig = serializationFig;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.keyspace = keyspace;
         this.listeners = listeners;
@@ -130,91 +117,94 @@ public class EntityVersionCleanupTask implements Task<Void> {
     @Override
     public Void call() throws Exception {
         //TODO Refactor this logic into a a class that can be invoked from anywhere
-        //load every entity we have history of
-        Observable<List<MvccLogEntry>> deleteFieldsObservable =
 
-                Observable.create( new Observable.OnSubscribe<MvccLogEntry>() {
 
+        //iterate all unique values
+        final BlockingObservable<Long> uniqueValueCleanup =
+                Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
                     @Override
-                    public void call( final Subscriber<? super MvccLogEntry> subscriber ) {
-
-                        throw new NotImplementedException( "Implement me" );
-
-                        //TODO, make this a collector so it iterates on itself (if possible)
-//                        List<MvccLogEntry> entities = logEntrySerializationStrategy.load( scope, entityId, version, 1000 );
-//                        do {
-//
-//                            List<MvccLogEntry> entities = logEntrySerializationStrategy.load( scope, entityId, version, 1000 );
-//                        }while()
-
-
+                    protected Iterator<UniqueValue> getIterator() {
+                        return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
                     }
                 } )
-//            Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
-//                @Override
-//                protected Iterator<MvccEntity> getIterator() {
-//
-//                }
-//            })
-            //buffer them for efficiency
-            .skip( 1 )
-            .buffer(serializationFig.getBufferSize()).doOnNext( new Action1<List<MvccLogEntry>>() {
+
+                        //skip current versions
+                        .skipWhile( new Func1<UniqueValue, Boolean>() {
+                            @Override
+                            public Boolean call( final UniqueValue uniqueValue ) {
+                                return version.equals( uniqueValue.getEntityVersion() );
+                            }
+                        } )
+                                //buffer our buffer size, then roll them all up in a single batch mutation
+                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
                     @Override
-                    public void call( final List<MvccLogEntry> mvccEntities ) {
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
-                        final MutationBatch logBatch = keyspace.prepareMutationBatch();
-
-                        for ( MvccLogEntry logEntry : mvccEntities ) {
-
-                            final UUID entityVersion = logEntry.getVersion();
-                            final Id entityId = logEntry.getEntityId();
-
-                            throw new UnsupportedOperationException(
-                                    "This needs to delete unique fields differently.  Think about this a bit "
-                                            + "more" );
-                            //remove all unique fields from the index
-                            //                        for (final Field field : entity.getFields()) {
-                            //                            if (!field.isUnique()) {
-                            //                                continue;
-                            //                            }
-                            //                            final UniqueValue unique = new UniqueValueImpl(
-                            // field, entityId, entityVersion);
-                            //                            final MutationBatch deleteMutation =
-                            //                                    uniqueValueSerializationStrategy.delete
-                            // (scope,unique);
-                            //                            batch.mergeShallow(deleteMutation);
-                            //                        }
-                            //
-                            //                                    final MutationBatch logDelete =
-                            //                                            logEntrySerializationStrategy.delete(
-                            // scope, entityId, version );
-                            //                                    logBatch.mergeShallow( logDelete );
+                    public void call( final List<UniqueValue> uniqueValues ) {
+                        final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                        for ( UniqueValue value : uniqueValues ) {
+                            uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
                         }
 
                         try {
-                            batch.execute();
+                            uniqueCleanupBatch.execute();
                         }
-                        catch ( ConnectionException e1 ) {
-                            throw new RuntimeException( "Unable to execute " +
-                                    "unique value " +
-                                    "delete", e1 );
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to execute batch mutation", e );
                         }
+                    }
+                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+
+
+        //start calling the listeners for remove log entries
+        BlockingObservable<Long> versionsDeletedObservable =
+
+                Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
+                    @Override
+                    protected Iterator<MvccLogEntry> getIterator() {
+
+                        return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
+                                serializationFig.getBufferSize() );
+                    }
+                } )
+                        //skip current version
+                        .skipWhile( new Func1<MvccLogEntry, Boolean>() {
+                            @Override
+                            public Boolean call( final MvccLogEntry mvccLogEntry ) {
+                                return version.equals( mvccLogEntry.getVersion() );
+                            }
+                        } )
+                                //buffer them for efficiency
+                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
+                    @Override
+                    public void call( final List<MvccLogEntry> mvccEntities ) {
 
                         fireEvents( mvccEntities );
 
+                        final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                        for ( MvccLogEntry entry : mvccEntities ) {
+                            logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
+                        }
 
                         try {
-                            logBatch.execute();
+                            logCleanupBatch.execute();
                         }
                         catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to delete entities from the log", e );
+                            throw new RuntimeException( "Unable to execute batch mutation", e );
                         }
                     }
-                } );
+                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
 
-        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+        //wait or this to complete
+        final Long removedCount = uniqueValueCleanup.last();
 
-        logger.debug("Removed unique values for {} entities of entity {}",removedCount,entityId);
+        logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
+
+        final Long versionCleanupCount = versionsDeletedObservable.last();
+
+        logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
 
         return null;
     }
@@ -237,20 +227,20 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
         //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
         Observable.from( listeners )
-            .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
-
-                @Override
-                public Observable<EntityVersionDeleted> call(
-                        final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
-
-                    return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
-                        @Override
-                        public void call( final EntityVersionDeleted listener ) {
-                            listener.versionDeleted( scope, entityId, versions );
-                        }
-                    } );
-                }
-            }, Schedulers.io() ).toBlocking().last();
+                  .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
+
+                      @Override
+                      public Observable<EntityVersionDeleted> call(
+                              final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
+
+                          return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
+                              @Override
+                              public void call( final EntityVersionDeleted listener ) {
+                                  listener.versionDeleted( scope, entityId, versions );
+                              }
+                          } );
+                      }
+                  }, Schedulers.io() ).toBlocking().last();
 
         logger.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 24072d1..7aed8db 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
@@ -96,17 +97,21 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
     public static final int COL_VALUE = 0x0;
 
 
-    protected final Keyspace keyspace;
+    private final Keyspace keyspace;
+    private final SerializationFig serializationFig;
+
 
 
     /**
      * Construct serialization strategy for keyspace.
      *
      * @param keyspace Keyspace in which to store Unique Values.
+     * @param serializationFig
      */
     @Inject
-    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace ) {
+    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) {
         this.keyspace = keyspace;
+        this.serializationFig = serializationFig;
     }
 
 
@@ -325,7 +330,7 @@ public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializ
 
         RowQuery<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry> query =
                 keyspace.prepareQuery( CF_ENTITY_UNIQUE_VALUES ).getKey( rowKey ).withColumnRange(
-                    ( UniqueFieldEntry ) null, null, false, 1000 );
+                    ( UniqueFieldEntry ) null, null, false, serializationFig.getBufferSize() );
 
         return new ColumnNameIterator( query, new UniqueEntryParser( entityId ), false );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index e86e218..b56b867 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -314,98 +314,6 @@ public class EntityCollectionManagerIT {
     }
 
 
-    @Test
-    public void partialUpdate() {
-        StringField testField1 = new StringField( "testField", "value" );
-        StringField addedField = new StringField( "testFud", "NEWPARTIALUPDATEZOMG" );
-
-        CollectionScope context =
-                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "testUpdate" ), "testUpdate" );
-
-        Entity oldEntity = new Entity( new SimpleId( "testUpdate" ) );
-        oldEntity.setField( new StringField( "testField", "value" ) );
-
-        EntityCollectionManager manager = factory.createCollectionManager( context );
-
-        Observable<Entity> observable = manager.write( oldEntity );
-
-        Entity returned = observable.toBlocking().lastOrDefault( null );
-
-        assertNotNull( "Returned has a uuid", returned.getId() );
-
-        final UUID writeVersion = returned.getVersion();
-
-        assertNotNull( "Write version was set", writeVersion );
-
-        /**
-         * Modify the oldEntity
-         */
-        oldEntity.getFields().remove( testField1 );
-        oldEntity.setField( addedField );
-
-        observable = manager.update( oldEntity );
-
-        Entity updateReturned = observable.toBlocking().lastOrDefault( null );
-
-        assertNotNull( "Returned has a uuid", returned.getId() );
-        assertEquals( oldEntity.getField( "testFud" ), returned.getField( "testFud" ) );
-
-        final UUID updatedVersion = updateReturned.getVersion();
-
-        assertNotNull( "Updated version returned", updatedVersion );
-
-        assertTrue( "Updated version higher", UUIDComparator.staticCompare( updatedVersion, writeVersion ) > 0 );
-
-        Observable<Entity> newEntityObs = manager.load( updateReturned.getId() );
-        Entity newEntity = newEntityObs.toBlocking().last();
-
-        final UUID returnedVersion = newEntity.getVersion();
-
-        assertEquals( "Loaded version matches updated version", updatedVersion, returnedVersion );
-
-        assertNotNull( "Returned has a uuid", returned.getId() );
-        assertEquals( addedField, newEntity.getField( "testFud" ) );
-    }
-
-
-    @Test
-    public void partialUpdateDelete() {
-        StringField testField = new StringField( "testField", "value" );
-        StringField addedField = new StringField( "testFud", "NEWPARTIALUPDATEZOMG" );
-
-        CollectionScope context =
-                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "testUpdate" ), "testUpdate" );
-
-        Entity oldEntity = new Entity( new SimpleId( "testUpdate" ) );
-        oldEntity.setField( new StringField( "testField", "value" ) );
-
-        EntityCollectionManager manager = factory.createCollectionManager( context );
-
-        Observable<Entity> observable = manager.write( oldEntity );
-
-        Entity returned = observable.toBlocking().lastOrDefault( null );
-
-        assertNotNull( "Returned has a uuid", returned.getId() );
-
-        oldEntity.getFields().remove( testField );
-        oldEntity.setField( addedField );
-
-        //Entity is deleted then updated right afterwards.
-        manager.delete( oldEntity.getId() );
-
-        observable = manager.update( oldEntity );
-
-        returned = observable.toBlocking().lastOrDefault( null );
-
-        assertNotNull( "Returned has a uuid", returned.getId() );
-        assertEquals( oldEntity.getField( "testFud" ), returned.getField( "testFud" ) );
-
-        Observable<Entity> newEntityObs = manager.load( oldEntity.getId() );
-        Entity newEntity = newEntityObs.toBlocking().last();
-
-        assertNotNull( "Returned has a uuid", returned.getId() );
-        assertEquals( addedField, newEntity.getField( addedField.getName() ) );
-    }
 
 
     @Test
@@ -429,7 +337,7 @@ public class EntityCollectionManagerIT {
         // partial update entity but we don't have version number
         Entity updateEntity = new Entity( origEntity.getId() );
         updateEntity.setField( new StringField( "addedField", "other value" ) );
-        manager.update( origEntity ).toBlocking().lastOrDefault( null );
+        manager.write( updateEntity ).toBlocking().lastOrDefault( null );
 
         // get entity now, it must have a new version
         returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null );
@@ -506,7 +414,7 @@ public class EntityCollectionManagerIT {
 
             written.setField( new BooleanField( "updated", true ) );
 
-            final Entity updated = manager.update( written ).toBlocking().last();
+            final Entity updated = manager.write( written ).toBlocking().last();
 
             writtenEntities.add( updated );
             entityIds.add( updated.getId() );
@@ -657,7 +565,7 @@ public class EntityCollectionManagerIT {
 
         final Entity newEntity = new Entity( new SimpleId( "test" ) );
 
-        final Entity v1Created = manager.update( newEntity ).toBlocking().lastOrDefault( null );
+        final Entity v1Created = manager.write( newEntity ).toBlocking().lastOrDefault( null );
 
         assertNotNull( "Id was assigned", v1Created.getId() );
         assertNotNull( "Version was assigned", v1Created.getVersion() );
@@ -674,7 +582,7 @@ public class EntityCollectionManagerIT {
         assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() );
         assertEquals( Stage.COMMITTED, version1Log.getStage() );
 
-        final Entity v2Created = manager.update( v1Created ).toBlocking().last();
+        final Entity v2Created = manager.write( v1Created ).toBlocking().last();
 
         final UUID v2Version = v2Created.getVersion();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index bc6f4a4..b4921a3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -19,46 +19,50 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
-import com.google.common.base.Optional;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.junit.AfterClass;
+import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.Assert;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock;
+import org.apache.usergrid.persistence.collection.util.VersionGenerator;
 import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.junit.Ignore;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -78,729 +82,644 @@ public class EntityVersionCleanupTaskTest {
     }
 
 
-    @Test
-    public void holder(){
-        fail("Fix this test");
+    @Test( timeout = 10000 )
+    public void noListenerOneVersion() throws Exception {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+
+
+        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn(
+                mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch );
+
+        // intentionally no events
+        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+
+        // mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+
+
+        //get the version we're keeping, it's first in our list
+        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+
+        //mock up unique version output
+        final UniqueValueEntryMock uniqueValueEntryMock =
+                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+                        version );
+
+        final MutationBatch newBatch = mock( MutationBatch.class );
+
+
+        // set up returning a mutator
+        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+
+        //return a new batch when it's called
+        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+
+
+        cleanupTask.call();
+
+
+        //get the second field, this should be deleted
+        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+
+        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+
+
+        //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+    }
+
+
+    /**
+     * Tests the cleanup task on the first version created
+     */
+    @Test( timeout = 10000 )
+    public void noListenerNoVersions() throws Exception {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+
+
+        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn(
+                mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch );
+
+        // intentionally no events
+        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        final List<UUID> versions = VersionGenerator.generateVersions( 1 );
+
+        // mock up a single log entry, with no other entries
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+
+
+        //get the version we're keeping, it's first in our list
+        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+
+        //mock up unique version output
+        final UniqueValueEntryMock uniqueValueEntryMock =
+                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+                        version );
+
+        final MutationBatch newBatch = mock( MutationBatch.class );
+
+
+        // set up returning a mutator
+        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+
+        //return a new batch when it's called
+        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+
+
+        cleanupTask.call();
+
+
+        //verify delete was never invoked
+        verify( uvss, never() ).delete( any( CollectionScope.class ), any( UniqueValue.class ) );
+
+        //verify the delete was never invoked
+        verify( less, never() ).delete( any( CollectionScope.class ), any( Id.class ), any( UUID.class ) );
+    }
+
+
+    @Test( timeout = 10000 )
+    public void singleListenerSingleVersion() throws Exception {
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+
+        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+
+        listeners.add( eventListener );
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+
+        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn(
+                mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch );
+
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+
+
+        // mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+
+
+        //get the version we're keeping, it's first in our list
+        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+
+        //mock up unique version output
+        final UniqueValueEntryMock uniqueValueEntryMock =
+                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+                        version );
+
+        final MutationBatch newBatch = mock( MutationBatch.class );
+
+
+        // set up returning a mutator
+        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+
+        //return a new batch when it's called
+        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+
+
+        cleanupTask.call();
+
+
+        //get the second field, this should be deleted
+        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+
+        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+
+
+        //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    @Test//(timeout=10000)
+    public void multipleListenerMultipleVersions() throws Exception {
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 );
+
+        final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+        final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+        final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+
+        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+
+        listeners.add( listener1 );
+        listeners.add( listener2 );
+        listeners.add( listener3 );
+
+
+
+
+        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+
+        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn(
+                mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch );
+
+
+
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+
+
+        // mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+
+
+        //get the version we're keeping, it's first in our list
+        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+
+        //mock up unique version output
+        final UniqueValueEntryMock uniqueValueEntryMock =
+                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+                        version );
+
+        final MutationBatch newBatch = mock( MutationBatch.class );
+
+
+        // set up returning a mutator
+        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+
+        //return a new batch when it's called
+        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+
+
+        cleanupTask.call();
+
+
+        //get the second field, this should be deleted
+        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+
+        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+
+
+        //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+
+
+        //the latch was executed
+        latch.await();
+
+        //we deleted the version
+        //verify we deleted everything
+          //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    /**
+     * Tests what happens when our listeners are VERY slow
+     */
+//    @Ignore( "Test is a work in progress" )
+    @Test( timeout = 10000 )
+    public void multipleListenerMultipleVersionsNoThreadsToRun()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final int listenerCount = 5;
+
+        final CountDownLatch latch =
+                new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount );
+        final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+
+        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+
+        listeners.add( listener1 );
+        listeners.add( listener2 );
+        listeners.add( listener3 );
+        listeners.add( listener4 );
+        listeners.add( listener5 );
+
+
+        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+
+        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn(
+                mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch );
+
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+
+        // mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+
+
+        //get the version we're keeping, it's first in our list
+        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+
+
+        //mock up unique version output
+        final UniqueValueEntryMock uniqueValueEntryMock =
+                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+                        version );
+
+        final MutationBatch newBatch = mock( MutationBatch.class );
+
+
+        // set up returning a mutator
+        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+
+        //return a new batch when it's called
+        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+
+
+        //start the task
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+
+        /**
+         * While we're not done, release latches every 200 ms
+         */
+        while ( !future.isDone() ) {
+            Thread.sleep( 200 );
+            waitSemaphore.release( listenerCount );
+        }
+
+        //wait for the task
+        future.get();
+
+
+        //get the second field, this should be deleted
+        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+
+        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+
+
+        //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+
+
+        //the latch was executed
+        latch.await();
+
+        //we deleted the version
+        //verify we deleted everything
+        //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+
+        //the latch was executed
+        latch.await();
+
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    /**
+     * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+     */
+    @Test( timeout = 10000 )
+    public void singleListenerSingleVersionRejected()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+
+        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+
+        listeners.add( eventListener );
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+
+        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn(
+                mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch );
+
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+
+
+        // mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+
+
+        //get the version we're keeping, it's first in our list
+        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+
+        //mock up unique version output
+        final UniqueValueEntryMock uniqueValueEntryMock =
+                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+                        version );
+
+        final MutationBatch newBatch = mock( MutationBatch.class );
+
+
+        // set up returning a mutator
+        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+
+        //return a new batch when it's called
+        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+
+
+        cleanupTask.rejected();
+
+
+        //get the second field, this should be deleted
+        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+
+        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+
+
+        //verify delete was invoked
+        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+
+        //verify the delete was invoked
+        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+
+        // verify it was run
+        verify( entityBatch ).execute();
+
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+        final CountDownLatch invocationLatch;
+
+
+        private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+            this.invocationLatch = invocationLatch;
+        }
+
+
+        @Override
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
+                                    final List<MvccLogEntry> entityVersion ) {
+            invocationLatch.countDown();
+        }
+    }
+
+
+    private static class SlowListener extends EntityVersionDeletedTest {
+        final Semaphore blockLatch;
+
+
+        private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+            super( invocationLatch );
+            this.blockLatch = blockLatch;
+        }
+
+
+        @Override
+        public void versionDeleted( final CollectionScope scope, final Id entityId,
+                                    final List<MvccLogEntry> entityVersion ) {
+
+            //wait for unblock to happen before counting down invocation latches
+            try {
+                blockLatch.acquire();
+            }
+            catch ( InterruptedException e ) {
+                throw new RuntimeException( e );
+            }
+            super.versionDeleted( scope, entityId, entityVersion );
+        }
     }
-//    @Test(timeout=10000)
-//    public void noListenerOneVersion()
-//            throws ExecutionException, InterruptedException, ConnectionException {
-//
-//
-//        final SerializationFig serializationFig = mock( SerializationFig.class );
-//
-//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-//
-//        final MvccEntitySerializationStrategy ess =
-//                mock( MvccEntitySerializationStrategy.class );
-//
-//        final MvccLogEntrySerializationStrategy less =
-//                mock( MvccLogEntrySerializationStrategy.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch entityBatch = mock( MutationBatch.class );
-//
-//        final MutationBatch logBatch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() )
-//            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-//            .thenReturn( entityBatch )
-//            .thenReturn( logBatch );
-//
-//        // intentionally no events
-//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-//
-//        final Id applicationId = new SimpleId( "application" );
-//
-//        final CollectionScope appScope = new CollectionScopeImpl(
-//                applicationId, applicationId, "users" );
-//
-//        final Id entityId = new SimpleId( "user" );
-//
-//
-//        // mock up a single log entry for our first test
-//        final LogEntryMock logEntryMock =
-//                LogEntryMock.createLogEntryMock(less, appScope, entityId, 2 );
-//
-//        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
-//
-//        final UniqueValueSerializationStrategy uvss =
-//                mock( UniqueValueSerializationStrategy.class );
-//
-//        EntityVersionCleanupTask cleanupTask =
-//                new EntityVersionCleanupTask( serializationFig,
-//                        less,
-//                        ess,
-//                        uvss,
-//                        keyspace,
-//                        listeners,
-//                        appScope,
-//                        entityId,
-//                        version
-//                );
-//
-//        final MutationBatch newBatch = mock( MutationBatch.class );
-//
-//
-//        // set up returning a mutator
-//        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( newBatch );
-//
-//        when(less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( newBatch );
-//
-//        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        when( ess.loadDescendingHistory(
-//                same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
-//                .thenReturn(mel.iterator() );
-//
-//        try {
-//            cleanupTask.call();
-//        }catch(Exception e){
-//            fail( e.getMessage() );
-//        }
-//
-//        // verify it was run
-//        verify( entityBatch ).execute();
-//
-//        verify( logBatch ).execute();
-//    }
-//
-//
-//    /**
-//     * Tests the cleanup task on the first version created
-//     */
-//    @Test(timeout=10000)
-//    public void noListenerNoVersions()
-//            throws ExecutionException, InterruptedException, ConnectionException {
-//
-//
-//        final SerializationFig serializationFig = mock( SerializationFig.class );
-//
-//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-//
-//        final MvccEntitySerializationStrategy ess =
-//                mock( MvccEntitySerializationStrategy.class );
-//
-//        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
-//                mock( MvccLogEntrySerializationStrategy.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-//        final MutationBatch entityBatch = mock( MutationBatch.class );
-//        final MutationBatch logBatch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() )
-//            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-//            .thenReturn( entityBatch )
-//            .thenReturn( logBatch );
-//
-//
-//
-//        //intentionally no events
-//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-//
-//        final Id applicationId = new SimpleId( "application" );
-//
-//
-//        final CollectionScope appScope = new CollectionScopeImpl(
-//                applicationId, applicationId, "users" );
-//
-//        final Id entityId = new SimpleId( "user" );
-//
-//
-//        //mock up a single log entry for our first test
-//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
-//                mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
-//
-//
-//        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
-//
-//        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
-//                mock( UniqueValueSerializationStrategy.class );
-//
-//        EntityVersionCleanupTask cleanupTask =
-//                new EntityVersionCleanupTask( serializationFig,
-//                        mvccLogEntrySerializationStrategy,
-//                        ess,
-//                        uniqueValueSerializationStrategy,
-//                        keyspace,
-//                        listeners,
-//                        appScope,
-//                        entityId,
-//                        version
-//                );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//
-//        //set up returning a mutator
-//        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//        when( mvccLogEntrySerializationStrategy
-//                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
-//                .thenReturn(mel.iterator() );
-//
-//        //start the task
-//        try {
-//            cleanupTask.call();
-//        }catch(Exception e){
-//            fail( e.getMessage() );
-//        }
-//
-//
-//        // These last two verify statements do not make sense. We cannot assert that the entity
-//        // and log batches are never called. Even if there are no listeners the entity delete
-//        // cleanup task will still run to do the normal cleanup.
-//        //
-//        // verify( entityBatch, never() ).execute();
-//        // verify( logBatch, never() ).execute();
-//    }
-//
-//
-//    @Test(timeout=10000)
-//    public void singleListenerSingleVersion()
-//            throws ExecutionException, InterruptedException, ConnectionException {
-//
-//
-//        final SerializationFig serializationFig = mock( SerializationFig.class );
-//
-//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-//
-//        final MvccEntitySerializationStrategy ess =
-//                mock( MvccEntitySerializationStrategy.class );
-//
-//        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
-//                mock( MvccLogEntrySerializationStrategy.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-//        final MutationBatch entityBatch = mock( MutationBatch.class );
-//        final MutationBatch logBatch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() )
-//            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-//            .thenReturn( entityBatch )
-//            .thenReturn( logBatch );
-//
-//
-//
-//        //create a latch for the event listener, and add it to the list of events
-//        final int sizeToReturn = 1;
-//
-//        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-//
-//        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
-//
-//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-//
-//        listeners.add( eventListener );
-//
-//        final Id applicationId = new SimpleId( "application" );
-//
-//
-//        final CollectionScope appScope = new CollectionScopeImpl(
-//                applicationId, applicationId, "users" );
-//
-//        final Id entityId = new SimpleId( "user" );
-//
-//
-//        //mock up a single log entry for our first test
-//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
-//                mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
-//
-//
-//        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
-//
-//
-//        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
-//                mock( UniqueValueSerializationStrategy.class );
-//
-//        EntityVersionCleanupTask cleanupTask =
-//                new EntityVersionCleanupTask( serializationFig,
-//                        mvccLogEntrySerializationStrategy,
-//                        ess,
-//                        uniqueValueSerializationStrategy,
-//                        keyspace,
-//                        listeners,
-//                        appScope,
-//                        entityId,
-//                        version
-//                );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//
-//        //set up returning a mutator
-//        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//
-//        when( mvccLogEntrySerializationStrategy
-//                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//
-//        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
-//                .thenReturn(mel.iterator() );
-//
-//
-//        try {
-//            cleanupTask.call();
-//        }catch(Exception e){
-//            fail( e.getMessage() );
-//        }
-//
-//        //we deleted the version
-//        //verify it was run
-//        verify( entityBatch ).execute();
-//
-//        verify( logBatch ).execute();
-//
-//        //the latch was executed
-//        latch.await();
-//    }
-//
-//
-//    @Test//(timeout=10000)
-//    public void multipleListenerMultipleVersions()
-//            throws ExecutionException, InterruptedException, ConnectionException {
-//
-//
-//        final SerializationFig serializationFig = mock( SerializationFig.class );
-//
-//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-//
-//        final MvccEntitySerializationStrategy ess =
-//                mock( MvccEntitySerializationStrategy.class );
-//
-//        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
-//                mock( UniqueValueSerializationStrategy.class );
-//
-//        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
-//                mock( MvccLogEntrySerializationStrategy.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//        final MutationBatch entityBatch = mock( MutationBatch.class );
-//        final MutationBatch logBatch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() )
-//            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-//            .thenReturn( entityBatch )
-//            .thenReturn( logBatch );
-//
-//
-//        //create a latch for the event listener, and add it to the list of events
-//        final int sizeToReturn = 10;
-//
-//
-//        final CountDownLatch latch = new CountDownLatch(
-//                sizeToReturn/serializationFig.getBufferSize() * 3 );
-//
-//        final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
-//        final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
-//        final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
-//
-//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-//
-//        listeners.add( listener1 );
-//        listeners.add( listener2 );
-//        listeners.add( listener3 );
-//
-//        final Id applicationId = new SimpleId( "application" );
-//
-//        final CollectionScope appScope = new CollectionScopeImpl(
-//                applicationId, applicationId, "users" );
-//
-//        final Id entityId = new SimpleId( "user" );
-//
-//        // mock up a single log entry for our first test
-//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
-//                mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
-//
-//        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
-//
-//        EntityVersionCleanupTask cleanupTask =
-//                new EntityVersionCleanupTask( serializationFig,
-//                        mvccLogEntrySerializationStrategy,
-//                        ess,
-//                        uniqueValueSerializationStrategy,
-//                        keyspace,
-//                        listeners,
-//                        appScope,
-//                        entityId,
-//                        version
-//                );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//
-//        //set up returning a mutator
-//        when( ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//        when( mvccLogEntrySerializationStrategy
-//                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
-//
-//        Entity entity = new Entity( entityId );
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.of(entity)) );
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.of(entity)) );
-//
-//        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
-//                .thenReturn(mel.iterator() );
-//
-//        try {
-//            cleanupTask.call();
-//        }catch(Exception e){
-//            fail( e.getMessage() );
-//        }
-//        //we deleted the version
-//        //verify we deleted everything
-//        verify( entityBatch, times( 1 ) ).mergeShallow( any( MutationBatch.class ) );
-//
-//        verify( logBatch, times( 1 ) ).mergeShallow( any( MutationBatch.class ) );
-//
-//        verify( logBatch ).execute();
-//
-//        verify( entityBatch ).execute();
-//
-//        //the latch was executed
-//        latch.await();
-//    }
-//
-//
-//    /**
-//     * Tests what happens when our listeners are VERY slow
-//     */
-//    @Ignore("Test is a work in progress")
-//    @Test(timeout=10000)
-//    public void multipleListenerMultipleVersionsNoThreadsToRun()
-//            throws ExecutionException, InterruptedException, ConnectionException {
-//
-//
-//        final SerializationFig serializationFig = mock( SerializationFig.class );
-//
-//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-//
-//        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
-//                mock( MvccEntitySerializationStrategy.class );
-//
-//        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
-//                mock( MvccLogEntrySerializationStrategy.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-//
-//        final MutationBatch entityBatch = mock( MutationBatch.class );
-//        final MutationBatch logBatch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() )
-//            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-//            .thenReturn( entityBatch )
-//            .thenReturn( logBatch );
-//
-//
-//
-//
-//        //create a latch for the event listener, and add it to the list of events
-//        final int sizeToReturn = 10;
-//
-//
-//        final int listenerCount = 5;
-//
-//        final CountDownLatch latch = new CountDownLatch(
-//                sizeToReturn/serializationFig.getBufferSize() * listenerCount );
-//        final Semaphore waitSemaphore = new Semaphore( 0 );
-//
-//
-//        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
-//        final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
-//        final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
-//        final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
-//        final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
-//
-//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-//
-//        listeners.add( listener1 );
-//        listeners.add( listener2 );
-//        listeners.add( listener3 );
-//        listeners.add( listener4 );
-//        listeners.add( listener5 );
-//
-//        final Id applicationId = new SimpleId( "application" );
-//
-//
-//        final CollectionScope appScope = new CollectionScopeImpl(
-//                applicationId, applicationId, "users" );
-//
-//        final Id entityId = new SimpleId( "user" );
-//
-//
-//        //mock up a single log entry for our first test
-//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
-//                mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
-//
-//
-//        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
-//
-//
-//        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
-//                mock( UniqueValueSerializationStrategy.class );
-//
-//        EntityVersionCleanupTask cleanupTask =
-//                new EntityVersionCleanupTask( serializationFig,
-//                        mvccLogEntrySerializationStrategy,
-//                        mvccEntitySerializationStrategy,
-//                        uniqueValueSerializationStrategy,
-//                        keyspace,
-//                        listeners,
-//                        appScope,
-//                        entityId,
-//                        version
-//                );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//
-//        //set up returning a mutator
-//        when( mvccEntitySerializationStrategy
-//                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//
-//        when( mvccLogEntrySerializationStrategy
-//                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//
-//        //start the task
-//        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
-//
-//        /**
-//         * While we're not done, release latches every 200 ms
-//         */
-//        while ( !future.isDone() ) {
-//            Thread.sleep( 200 );
-//            waitSemaphore.release( listenerCount );
-//        }
-//
-//        //wait for the task
-//        future.get();
-//
-//
-//
-//        //we deleted the version
-//        //verify we deleted everything
-//        verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
-//
-//        verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
-//
-//
-//        verify( logBatch ).execute();
-//
-//        verify( entityBatch ).execute();
-//
-//
-//
-//        //the latch was executed
-//        latch.await();
-//    }
-//
-//    /**
-//     * Tests that our task will run in the caller if there's no threads, ensures that the task runs
-//     */
-//    @Test(timeout=10000)
-//    public void singleListenerSingleVersionRejected()
-//            throws ExecutionException, InterruptedException, ConnectionException {
-//
-//
-//        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
-//
-//        final SerializationFig serializationFig = mock( SerializationFig.class );
-//
-//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-//
-//        final MvccEntitySerializationStrategy ess =
-//                mock( MvccEntitySerializationStrategy.class );
-//
-//        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
-//                mock( MvccLogEntrySerializationStrategy.class );
-//
-//        final Keyspace keyspace = mock( Keyspace.class );
-//
-//
-//        final MutationBatch entityBatch = mock( MutationBatch.class );
-//        final MutationBatch logBatch = mock( MutationBatch.class );
-//
-//        when( keyspace.prepareMutationBatch() )
-//                .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-//                .thenReturn( entityBatch )
-//                .thenReturn( logBatch );
-//
-//
-//
-//        //create a latch for the event listener, and add it to the list of events
-//        final int sizeToReturn = 1;
-//
-//        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-//
-//        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
-//
-//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-//
-//        listeners.add( eventListener );
-//
-//        final Id applicationId = new SimpleId( "application" );
-//
-//
-//        final CollectionScope appScope = new CollectionScopeImpl(
-//                applicationId, applicationId, "users" );
-//
-//        final Id entityId = new SimpleId( "user" );
-//
-//
-//        //mock up a single log entry for our first test
-//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
-//                mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
-//
-//
-//        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
-//
-//
-//        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
-//                mock( UniqueValueSerializationStrategy.class );
-//
-//        EntityVersionCleanupTask cleanupTask =
-//                new EntityVersionCleanupTask( serializationFig,
-//                        mvccLogEntrySerializationStrategy,
-//                        ess,
-//                        uniqueValueSerializationStrategy,
-//                        keyspace,
-//                        listeners,
-//                        appScope,
-//                        entityId,
-//                        version
-//                );
-//
-//        final MutationBatch batch = mock( MutationBatch.class );
-//
-//
-//        //set up returning a mutator
-//        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//
-//        when( mvccLogEntrySerializationStrategy
-//                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-//                .thenReturn( batch );
-//
-//
-//        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
-//                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
-//
-//        when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
-//                .thenReturn(mel.iterator() );
-//
-//
-//        try {
-//            cleanupTask.rejected();
-//        }catch(Exception e){
-//            fail( e.getMessage() );
-//        }
-//
-//        //we deleted the version
-//        //verify it was run
-//        verify( entityBatch ).execute();
-//
-//        verify( logBatch ).execute();
-//
-//        //the latch was executed
-//        latch.await();
-//    }
-//
-//    private static class EntityVersionDeletedTest implements EntityVersionDeleted {
-//        final CountDownLatch invocationLatch;
-//
-//
-//        private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
-//            this.invocationLatch = invocationLatch;
-//        }
-//
-//
-//        @Override
-//        public void versionDeleted( final CollectionScope scope, final Id entityId,
-//                final List<MvccEntity> entityVersion ) {
-//            invocationLatch.countDown();
-//        }
-//
-//    }
-//
-//
-//    private static class SlowListener extends EntityVersionDeletedTest {
-//        final Semaphore blockLatch;
-//
-//        private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
-//            super( invocationLatch );
-//            this.blockLatch = blockLatch;
-//        }
-//
-//
-//        @Override
-//        public void versionDeleted( final CollectionScope scope, final Id entityId,
-//                final List<MvccEntity> entityVersion ) {
-//
-//            //wait for unblock to happen before counting down invocation latches
-//            try {
-//                blockLatch.acquire();
-//            }
-//            catch ( InterruptedException e ) {
-//                throw new RuntimeException( e );
-//            }
-//            super.versionDeleted( scope, entityId, entityVersion );
-//        }
-//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/787bc091/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
index 20d478d..6081b48 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
@@ -12,6 +12,7 @@ import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.collection.util.VersionGenerator;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -105,7 +106,7 @@ public class LogEntryIteratorTest {
 
 
         final LogEntryMock mockResults =
-                LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, toGenerate );
+                LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) );
 
         Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();