You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:05 UTC

[06/19] git commit: Finished testing of the EntityVersionCleanupTask

Finished testing of the EntityVersionCleanupTask


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d4f80e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d4f80e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d4f80e7c

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: d4f80e7c85093e17081a4210491022b9001a02b7
Parents: 018dbee
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 15:25:09 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:07:50 2014 -0600

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTask.java          | 121 ++--
 .../impl/EntityVersionCleanupTaskTest.java      | 583 ++++++++++++++++++-
 .../persistence/core/task/ImmediateTask.java    |  12 +-
 .../core/task/NamedTaskExecutorImpl.java        |  10 +-
 .../usergrid/persistence/core/task/Task.java    |  27 +-
 .../persistence/core/task/TaskExecutor.java     |   7 +-
 .../core/task/NamedTaskExecutorImplTest.java    |   9 +-
 .../impl/shard/ShardGroupCompaction.java        |   2 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  13 +-
 9 files changed, 684 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 29ca3ac..3d1483d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,10 +1,10 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Stack;
 import java.util.UUID;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RecursiveTask;
 
 import org.slf4j.Logger;
@@ -15,10 +15,8 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.entity.EntityVersion;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -27,12 +25,11 @@ import org.apache.usergrid.persistence.model.entity.Id;
  * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
  * retained, the range is exclusive.
  */
-public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+public class EntityVersionCleanupTask extends Task<Void> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
 
 
-    private final CollectionIoEvent<EntityVersion> collectionIoEvent;
     private final List<EntityVersionDeleted> listeners;
 
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
@@ -40,29 +37,31 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
 
     private final SerializationFig serializationFig;
 
+    private final CollectionScope scope;
+    private final Id entityId;
+    private final UUID version;
 
-    private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                                      final MvccEntitySerializationStrategy entitySerializationStrategy,
-                                      final CollectionIoEvent<EntityVersion> collectionIoEvent,
-                                      final List<EntityVersionDeleted> listeners,
-                                      final SerializationFig serializationFig ) {
-        this.collectionIoEvent = collectionIoEvent;
-        this.listeners = listeners;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-        this.serializationFig = serializationFig;
-    }
 
+    public EntityVersionCleanupTask( final SerializationFig serializationFig,
+                                     final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                     final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                     final List<EntityVersionDeleted> listeners, final CollectionScope scope,
+                                     final Id entityId, final UUID version ) {
 
-    @Override
-    public CollectionIoEvent<EntityVersion> getId() {
-        return collectionIoEvent;
+        this.serializationFig = serializationFig;
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+        this.listeners = listeners;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.version = version;
     }
 
 
     @Override
     public void exceptionThrown( final Throwable throwable ) {
-        LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+                new Object[] { scope, entityId, version }, throwable );
     }
 
 
@@ -81,15 +80,15 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
 
 
     @Override
-    public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
+    public Void executeTask() throws Exception {
 
-        final CollectionScope scope = collectionIoEvent.getEntityCollection();
-        final Id entityId = collectionIoEvent.getEvent().getId();
-        final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+        final UUID maxVersion = version;
 
 
         LogEntryIterator logEntryIterator =
-                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() );
+                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+                        serializationFig.getHistorySize() );
 
 
         //for every entry, we want to clean it up with listeners
@@ -100,39 +99,73 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
 
 
             final UUID version = logEntry.getVersion();
-            List<ForkJoinTask<Void>> tasks = new ArrayList<>();
 
 
-            //execute all the listeners
-            for (final  EntityVersionDeleted listener : listeners ) {
+            fireEvents();
 
-                tasks.add( new RecursiveTask<Void>() {
-                    @Override
-                    protected Void compute() {
-                        listener.versionDeleted( scope, entityId, version );
-                        return null;
-                    }
-                }.fork() );
+            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
+            //after every successful invocation of listeners and entity removal
+            entitySerializationStrategy.delete( scope, entityId, version ).execute();
 
+            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+        }
 
-            }
 
-            //wait for them to complete
+        return null;
+    }
 
-            joinAll(tasks);
 
-            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
-            //after every successful invocation of listeners and entity removal
-            entitySerializationStrategy.delete( scope, entityId, version ).execute();
+    private void fireEvents() throws ExecutionException, InterruptedException {
 
-            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+        if ( listeners.size() == 0 ) {
+            return;
         }
 
+        //stack to track forked tasks
+        final Stack<RecursiveTask<Void>> tasks = new Stack<>();
+
+
+        //we don't want to fork the final listener, we'll run that in our current thread
+        final int forkedTaskSize = listeners.size() - 1;
+
+
+        //execute all the listeners
+        for ( int i = 0; i < forkedTaskSize; i++ ) {
+
+            final EntityVersionDeleted listener = listeners.get( i );
+
+            final RecursiveTask<Void> task = createTask( listener );
+
+            task.fork();
 
-        return collectionIoEvent;
+            tasks.push( task );
+        }
+
+
+        final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize ) );
+
+        lastTask.invoke();
+
+
+        //wait for them to complete
+        while ( !tasks.isEmpty() ) {
+            tasks.pop().get();
+        }
     }
 
 
+    /**
+     * Return the new task to execute
+     */
+    private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
+        return new RecursiveTask<Void>() {
+            @Override
+            protected Void compute() {
+                listener.versionDeleted( scope, entityId, version );
+                return null;
+            }
+        };
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 050ea9e..2df75f1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -22,10 +22,34 @@ package org.apache.usergrid.persistence.collection.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
 
+import org.junit.AfterClass;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -33,8 +57,563 @@ import static org.junit.Assert.*;
  */
 public class EntityVersionCleanupTaskTest {
 
+    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4 );
+
+
+    @AfterClass
+    public static void shutdown() {
+        taskExecutor.shutdown();
+    }
+
+
+    @Test
+    public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //intentionally no events
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock =
+                LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 2 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //verify it was run
+        verify( firstBatch ).execute();
+
+        verify( secondBatch ).execute();
+    }
+
+
+    /**
+     * Tests the cleanup task on the first version ceated
+     */
+    @Test
+    public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //intentionally no events
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock =
+                LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //verify it was run
+        verify( firstBatch, never() ).execute();
+
+        verify( secondBatch, never() ).execute();
+    }
+
+
     @Test
-    public void multiPageTask(){
+    public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        listeners.add( eventListener );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //we deleted the version
+        //verify it was run
+        verify( firstBatch ).execute();
+
+        verify( secondBatch ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    @Test
+    public void multipleListenerMultipleVersions()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+
+        final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+        final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+        final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        listeners.add( listener1 );
+        listeners.add( listener2 );
+        listeners.add( listener3 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( firstBatch, times( sizeToReturn ) ).execute();
+
+        verify( secondBatch, times( sizeToReturn ) ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    /**
+     * Tests what happens when our listeners are VERY slow
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws ConnectionException
+     */
+    @Test
+    public void multipleListenerMultipleVersionsNoThreadsToRun()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final int listenerCount = 5;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+        listeners.add( listener1 );
+        listeners.add( listener2 );
+        listeners.add( listener3 );
+        listeners.add( listener4 );
+        listeners.add( listener5 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        /**
+         * While we're not done, release latches every 200 ms
+         */
+        while(!cleanupTask.isDone()) {
+            Thread.sleep( 200 );
+            waitSemaphore.release( listenerCount );
+        }
+
+        //wait for the task
+        cleanupTask.get();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( firstBatch, times( sizeToReturn ) ).execute();
+
+        verify( secondBatch, times( sizeToReturn ) ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+
+    /**
+     * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws ConnectionException
+     */
+    @Test
+    public void runsWhenRejected()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        /**
+         * only 1 thread on purpose, we want to saturate the task
+         */
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final int listenerCount = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+        listeners.add( listener1 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask firstTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        EntityVersionCleanupTask secondTask =
+                      new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                              mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( firstTask );
+
+        //now start another task while the slow running task is running
+        taskExecutor.submit( secondTask );
+
+        //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
+        secondTask.get();
+
+        /**
+         * While we're not done, release latches every 200 ms
+         */
+        while(!firstTask.isDone()) {
+            Thread.sleep( 200 );
+            waitSemaphore.release( listenerCount );
+        }
+
+        //wait for the task
+        firstTask.get();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( firstBatch, times( sizeToReturn*2 ) ).execute();
+
+        verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+        final CountDownLatch invocationLatch;
+
+
+        private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+            this.invocationLatch = invocationLatch;
+        }
+
+
+        @Override
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+            invocationLatch.countDown();
+        }
+    }
+
+
+    private static class SlowListener extends EntityVersionDeletedTest {
+        final Semaphore blockLatch;
+
+
+        private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+            super( invocationLatch );
+            this.blockLatch = blockLatch;
+        }
+
 
+        @Override
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+            //wait for unblock to happen before counting down invocation latches
+            try {
+                blockLatch.acquire();
+            }
+            catch ( InterruptedException e ) {
+                throw new RuntimeException( e );
+            }
+            super.versionDeleted( scope, entityId, entityVersion );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
index 627e7e8..f6c28a3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -5,23 +5,17 @@ package org.apache.usergrid.persistence.core.task;
  * Does not perform computation, just returns the value passed to it
  *
  */
-public class ImmediateTask<V, I> extends Task<V, I> {
+public class ImmediateTask<V> extends Task<V> {
+
 
-    private final I id;
     private final V returned;
 
 
-    protected ImmediateTask( final I id, final V returned ) {
-        this.id = id;
+    protected ImmediateTask( final V returned ) {
         this.returned = returned;
     }
 
 
-    @Override
-    public I getId() {
-        return id;
-    }
-
 
     @Override
     public V executeTask() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 4fc72c8..bb9cac8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -55,6 +55,8 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
      */
     public NamedTaskExecutorImpl( final String name, final int poolSize ) {
+
+        //TODO, figure out how to name the fork/join threads in the pool
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -67,7 +69,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> Task<V, I> submit( final Task<V, I> task ) {
+    public <V> Task<V> submit( final Task<V> task ) {
 
         try {
             executorService.submit( task );
@@ -80,6 +82,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     }
 
 
+    @Override
+    public void shutdown() {
+        executorService.shutdownNow();
+    }
+
+
     private final class NamedForkJoinPool extends ForkJoinPool {
 
         private NamedForkJoinPool( final int workerThreadCount ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 4dccc72..5d35ce4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -29,13 +29,7 @@ import java.util.concurrent.RecursiveTask;
 /**
  * The task to execute
  */
-public abstract class Task<V, I> extends RecursiveTask<V> {
-
-    /**
-     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
-     */
-    public abstract I getId();
-
+public abstract class Task<V> extends RecursiveTask<V> {
 
     @Override
     protected V compute() {
@@ -49,25 +43,6 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
     }
 
 
-    /**
-     * Fork all tasks in the list except the last one.  The last will be run in the caller thread
-     * The others will wait for join
-     * @param tasks
-     *
-     */
-    public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T> tasks) throws ExecutionException, InterruptedException {
-
-        //don't fork the last one
-       List<V> results = new ArrayList<>(tasks.size());
-
-        for(T task: tasks){
-            results.add( task.join() );
-        }
-
-        return results;
-
-    }
-
 
     /**
      * Execute the task

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index a3553c7..c6f5915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,10 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> Task<V, I > submit( Task<V, I> task );
+    public <V> Task<V > submit( Task<V> task );
+
+    /**
+     * Stop the task executor without waiting for scheduled threads to run
+     */
+    public void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index f65d5a6..9656b5f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
 
         executor.submit( task );
 
@@ -254,7 +254,7 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    private static abstract class TestTask<V> extends Task<V, UUID> {
+    private static abstract class TestTask<V> extends Task<V> {
 
         protected final List<Throwable> exceptions;
         protected final CountDownLatch exceptionLatch;
@@ -272,11 +272,6 @@ public class NamedTaskExecutorImplTest {
         }
 
 
-        @Override
-        public UUID getId() {
-            return UUIDGenerator.newTimeUUID();
-        }
-
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 15acaa8..ed63587 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -44,7 +44,7 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+    public Task<AuditResult> evaluateShardGroup(
             final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 751b4d9..a84a0f0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -273,7 +273,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
     @Override
-    public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+    public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
                                                                 final DirectedEdgeMeta edgeMeta,
                                                                 final ShardEntryGroup group ) {
 
@@ -282,7 +282,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return new ImmediateTask<AuditResult, ShardAuditKey>(  new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
+            return new ImmediateTask<AuditResult>(  AuditResult.NOT_CHECKED ) {};
         }
 
         /**
@@ -295,7 +295,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
+    private final class ShardAuditTask extends Task<AuditResult> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -310,11 +310,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         }
 
 
-        @Override
-        public ShardAuditKey getId() {
-            return new ShardAuditKey( scope, edgeMeta, group );
-        }
-
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -325,7 +320,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         @Override
         public void rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of {}", getId() );
+            LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
         }