You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:40:57 UTC

[2/5] incubator-usergrid git commit: First pass at refactoring mark + sweep

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java
new file mode 100644
index 0000000..c1f76f2
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanupTest.java
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock;
+import org.apache.usergrid.persistence.collection.util.VersionGenerator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class UniqueCleanupTest {
+
+//
+//    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
+//
+//
+//    @AfterClass
+//    public static void shutdown() {
+//        taskExecutor.shutdown();
+//    }
+//
+//
+//    @Test( timeout = 10000 )
+//    public void noListenerOneVersion() throws Exception {
+//
+//
+//        final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//
+//        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+//        when( keyspace.prepareMutationBatch() ).thenReturn(
+//                mock( MutationBatch.class ) ) // don't care what happens to this one
+//                .thenReturn( entityBatch );
+//
+//        // intentionally no events
+//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+//
+//        final Id applicationId = new SimpleId( "application" );
+//
+//        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+//        final Id entityId = new SimpleId( "user" );
+//
+//        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//        // mock up a single log entry for our first test
+//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+//        //get the version we're keeping, it's first in our list
+//        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//        //mock up unique version output
+//        final UniqueValueEntryMock uniqueValueEntryMock =
+//                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+//        EntityVersionCleanupTask cleanupTask =
+//                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+//                        version, false );
+//
+//        final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+//        // set up returning a mutator
+//        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+//        //return a new batch when it's called
+//        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+//        cleanupTask.call();
+//
+//
+//        //get the second field, this should be deleted
+//        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+//        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+//        //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//    }
+//
+//
+//    /**
+//     * Tests the cleanup task on the first version created
+//     */
+//    @Test( timeout = 10000 )
+//    public void noListenerNoVersions() throws Exception {
+//
+//
+//        final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//
+//        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+//        when( keyspace.prepareMutationBatch() ).thenReturn(
+//                mock( MutationBatch.class ) ) // don't care what happens to this one
+//                .thenReturn( entityBatch );
+//
+//        // intentionally no events
+//        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+//        final Id applicationId = new SimpleId( "application" );
+//
+//        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+//        final Id entityId = new SimpleId( "user" );
+//
+//
+//        final List<UUID> versions = VersionGenerator.generateVersions( 1 );
+//
+//        // mock up a single log entry, with no other entries
+//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+//        //get the version we're keeping, it's first in our list
+//        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//        //mock up unique version output
+//        final UniqueValueEntryMock uniqueValueEntryMock =
+//                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+//        EntityVersionCleanupTask cleanupTask =
+//                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false );
+//
+//        final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+//        // set up returning a mutator
+//        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+//        //return a new batch when it's called
+//        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+//        cleanupTask.call();
+//
+//
+//        //verify delete was never invoked
+//        verify( uvss, never() ).delete( any( ApplicationScope.class ), any( UniqueValue.class ) );
+//
+//        //verify the delete was never invoked
+//        verify( less, never() ).delete( any( ApplicationScope.class ), any( Id.class ), any( UUID.class ) );
+//    }
+//
+//
+//    @Test( timeout = 10000 )
+//    public void singleListenerSingleVersion() throws Exception {
+//
+//
+//        //create a latch for the event listener, and add it to the list of events
+//        final int sizeToReturn = 1;
+//
+//        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+//        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+//
+//        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+//        listeners.add( eventListener );
+//
+//
+//        final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+//        when( keyspace.prepareMutationBatch() ).thenReturn(
+//                mock( MutationBatch.class ) ) // don't care what happens to this one
+//                .thenReturn( entityBatch );
+//
+//
+//        final Id applicationId = new SimpleId( "application" );
+//
+//        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+//        final Id entityId = new SimpleId( "user" );
+//
+//
+//        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//
+//        // mock up a single log entry for our first test
+//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+//        //get the version we're keeping, it's first in our list
+//        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//        //mock up unique version output
+//        final UniqueValueEntryMock uniqueValueEntryMock =
+//                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+//        EntityVersionCleanupTask cleanupTask =
+//                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+//                        version, false );
+//
+//        final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+//        // set up returning a mutator
+//        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+//        //return a new batch when it's called
+//        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+//        cleanupTask.call();
+//
+//
+//        //get the second field, this should be deleted
+//        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+//        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+//        //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//
+//
+//        //the latch was executed
+//        latch.await();
+//    }
+//
+//
+//    @Test//(timeout=10000)
+//    public void multipleListenerMultipleVersions() throws Exception {
+//
+//        final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//
+//        //create a latch for the event listener, and add it to the list of events
+//        final int sizeToReturn = 10;
+//
+//        final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 );
+//
+//        final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+//        final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+//        final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+//
+//        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+//        listeners.add( listener1 );
+//        listeners.add( listener2 );
+//        listeners.add( listener3 );
+//
+//
+//
+//
+//        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+//        when( keyspace.prepareMutationBatch() ).thenReturn(
+//                mock( MutationBatch.class ) ) // don't care what happens to this one
+//                .thenReturn( entityBatch );
+//
+//
+//
+//
+//        final Id applicationId = new SimpleId( "application" );
+//
+//        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+//        final Id entityId = new SimpleId( "user" );
+//
+//        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//
+//        // mock up a single log entry for our first test
+//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+//        //get the version we're keeping, it's first in our list
+//        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//        //mock up unique version output
+//        final UniqueValueEntryMock uniqueValueEntryMock =
+//                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+//        EntityVersionCleanupTask cleanupTask =
+//                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+//                        version, false );
+//
+//        final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+//        // set up returning a mutator
+//        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+//        //return a new batch when it's called
+//        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+//        cleanupTask.call();
+//
+//
+//        //get the second field, this should be deleted
+//        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+//        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+//        //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//
+//
+//        //the latch was executed
+//        latch.await();
+//
+//        //we deleted the version
+//        //verify we deleted everything
+//          //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//
+//        //the latch was executed
+//        latch.await();
+//    }
+//
+//
+//    /**
+//     * Tests what happens when our listeners are VERY slow
+//     */
+////    @Ignore( "Test is a work in progress" )
+//    @Test( timeout = 10000 )
+//    public void multipleListenerMultipleVersionsNoThreadsToRun()
+//            throws ExecutionException, InterruptedException, ConnectionException {
+//
+//
+//        final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//
+//        //create a latch for the event listener, and add it to the list of events
+//        final int sizeToReturn = 10;
+//
+//
+//        final int listenerCount = 5;
+//
+//        final CountDownLatch latch =
+//                new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount );
+//        final Semaphore waitSemaphore = new Semaphore( 0 );
+//
+//
+//        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+//        final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+//        final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+//        final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+//        final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+//
+//        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+//
+//        listeners.add( listener1 );
+//        listeners.add( listener2 );
+//        listeners.add( listener3 );
+//        listeners.add( listener4 );
+//        listeners.add( listener5 );
+//
+//
+//        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+//        when( keyspace.prepareMutationBatch() ).thenReturn(
+//                mock( MutationBatch.class ) ) // don't care what happens to this one
+//                .thenReturn( entityBatch );
+//
+//
+//        final Id applicationId = new SimpleId( "application" );
+//
+//        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+//        final Id entityId = new SimpleId( "user" );
+//
+//
+//        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//        // mock up a single log entry for our first test
+//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+//        //get the version we're keeping, it's first in our list
+//        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//
+//        //mock up unique version output
+//        final UniqueValueEntryMock uniqueValueEntryMock =
+//                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+//        EntityVersionCleanupTask cleanupTask =
+//                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+//                        version, false);
+//
+//        final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+//        // set up returning a mutator
+//        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+//        //return a new batch when it's called
+//        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+//        //start the task
+//        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+//
+//        /**
+//         * While we're not done, release latches every 200 ms
+//         */
+//        while ( !future.isDone() ) {
+//            Thread.sleep( 200 );
+//            waitSemaphore.release( listenerCount );
+//        }
+//
+//        //wait for the task
+//        future.get();
+//
+//
+//        //get the second field, this should be deleted
+//        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+//        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+//        //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//
+//
+//        //the latch was executed
+//        latch.await();
+//
+//        //we deleted the version
+//        //verify we deleted everything
+//        //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//
+//        //the latch was executed
+//        latch.await();
+//
+//
+//        //the latch was executed
+//        latch.await();
+//    }
+//
+//
+//    /**
+//     * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+//     */
+//    @Test( timeout = 10000 )
+//    public void singleListenerSingleVersionRejected()
+//            throws ExecutionException, InterruptedException, ConnectionException {
+//
+//
+//
+//        //create a latch for the event listener, and add it to the list of events
+//        final int sizeToReturn = 1;
+//
+//        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+//        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+//
+//        final Set<EntityVersionDeleted> listeners = new HashSet<>();
+//
+//        listeners.add( eventListener );
+//
+//
+//        final SerializationFig serializationFig = mock( SerializationFig.class );
+//
+//        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+//
+//        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
+//
+//        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final MutationBatch entityBatch = mock( MutationBatch.class );
+//
+//        when( keyspace.prepareMutationBatch() ).thenReturn(
+//                mock( MutationBatch.class ) ) // don't care what happens to this one
+//                .thenReturn( entityBatch );
+//
+//
+//        final Id applicationId = new SimpleId( "application" );
+//
+//        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
+//
+//        final Id entityId = new SimpleId( "user" );
+//
+//
+//        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
+//
+//
+//        // mock up a single log entry for our first test
+//        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
+//
+//
+//        //get the version we're keeping, it's first in our list
+//        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
+//
+//        //mock up unique version output
+//        final UniqueValueEntryMock uniqueValueEntryMock =
+//                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
+//
+//
+//        EntityVersionCleanupTask cleanupTask =
+//                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
+//                        version, false );
+//
+//        final MutationBatch newBatch = mock( MutationBatch.class );
+//
+//
+//        // set up returning a mutator
+//        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
+//
+//        //return a new batch when it's called
+//        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
+//
+//
+//        cleanupTask.rejected();
+//
+//
+//        //get the second field, this should be deleted
+//        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
+//
+//        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
+//
+//
+//        //verify delete was invoked
+//        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
+//
+//        //verify the delete was invoked
+//        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
+//
+//        // verify it was run
+//        verify( entityBatch ).execute();
+//
+//
+//        //the latch was executed
+//        latch.await();
+//    }
+//
+//
+//    private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+//        final CountDownLatch invocationLatch;
+//
+//
+//        private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+//            this.invocationLatch = invocationLatch;
+//        }
+//
+//
+//        @Override
+//        public void versionDeleted( final ApplicationScope scope, final Id entityId,
+//                                    final List<MvccLogEntry> entityVersion ) {
+//            invocationLatch.countDown();
+//        }
+//    }
+//
+//
+//    private static class SlowListener extends EntityVersionDeletedTest {
+//        final Semaphore blockLatch;
+//
+//
+//        private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+//            super( invocationLatch );
+//            this.blockLatch = blockLatch;
+//        }
+//
+//
+//        @Override
+//        public void versionDeleted( final ApplicationScope scope, final Id entityId,
+//                                    final List<MvccLogEntry> entityVersion ) {
+//
+//            //wait for unblock to happen before counting down invocation latches
+//            try {
+//                blockLatch.acquire();
+//            }
+//            catch ( InterruptedException e ) {
+//                throw new RuntimeException( e );
+//            }
+//            super.versionDeleted( scope, entityId, entityVersion );
+//        }
+//    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java
new file mode 100644
index 0000000..8ffba71
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompactTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class VersionCompactTest {
+
+//    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
+//
+//       @AfterClass
+//       public static void shutdown() {
+//           taskExecutor.shutdown();
+//       }
+//
+//
+//       @Test(timeout=10000)
+//       public void noListener()
+//               throws ExecutionException, InterruptedException, ConnectionException {
+//
+//           // create a latch for the event listener, and add it to the list of events
+//
+//           final Set<EntityVersionCreated> listeners = mock( Set.class );
+//
+//           when ( listeners.size()).thenReturn( 0 );
+//
+//           final Id applicationId = new SimpleId( "application" );
+//
+//           final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+//           final Id entityId = new SimpleId( "user" );
+//           final Entity entity = new Entity( entityId );
+//
+//           // start the task
+//
+//           EntityVersionCreatedTask entityVersionCreatedTask =
+//                   new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+//           try {
+//               entityVersionCreatedTask.call();
+//           }catch(Exception e){
+//               Assert.fail( e.getMessage() );
+//           }
+//
+//
+//           // wait for the task
+//          // future.get();
+//
+//           //mocked listener makes sure that the task is called
+//           verify( listeners ).size();
+//
+//       }
+//       @Test(timeout=10000)
+//       public void oneListener()
+//               throws ExecutionException, InterruptedException, ConnectionException {
+//
+//           // create a latch for the event listener, and add it to the list of events
+//
+//           final int sizeToReturn = 1;
+//
+//           final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+//           final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
+//
+//           final Set<EntityVersionCreated> listeners = mock( Set.class );
+//           final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+//
+//           when ( listeners.size()).thenReturn( 1 );
+//           when ( listeners.iterator()).thenReturn( helper );
+//           when ( helper.next() ).thenReturn( eventListener );
+//
+//           final Id applicationId = new SimpleId( "application" );
+//
+//           final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+//           final Id entityId = new SimpleId( "user" );
+//           final Entity entity = new Entity( entityId );
+//
+//           // start the task
+//
+//           EntityVersionCreatedTask entityVersionCreatedTask =
+//               new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+//           try {
+//               entityVersionCreatedTask.call();
+//           }catch(Exception e){
+//
+//               Assert.fail(e.getMessage());
+//           }
+//           //mocked listener makes sure that the task is called
+//           verify( listeners ).size();
+//           verify( listeners ).iterator();
+//           verify( helper ).next();
+//
+//       }
+//
+//       @Test(timeout=10000)
+//       public void multipleListener()
+//               throws ExecutionException, InterruptedException, ConnectionException {
+//
+//           final int sizeToReturn = 3;
+//
+//           final Set<EntityVersionCreated> listeners = mock( Set.class );
+//           final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+//
+//           when ( listeners.size()).thenReturn( 3 );
+//           when ( listeners.iterator()).thenReturn( helper );
+//
+//           final Id applicationId = new SimpleId( "application" );
+//
+//           final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+//           final Id entityId = new SimpleId( "user" );
+//           final Entity entity = new Entity( entityId );
+//
+//           // start the task
+//
+//           EntityVersionCreatedTask entityVersionCreatedTask =
+//                   new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+//           final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+//           final EntityVersionCreatedTest listener1 = new EntityVersionCreatedTest(latch);
+//           final EntityVersionCreatedTest listener2 = new EntityVersionCreatedTest(latch);
+//           final EntityVersionCreatedTest listener3 = new EntityVersionCreatedTest(latch);
+//
+//           when ( helper.next() ).thenReturn( listener1,listener2,listener3);
+//
+//           try {
+//               entityVersionCreatedTask.call();
+//           }catch(Exception e){
+//               ;
+//           }
+//           //ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
+//
+//           //wait for the task
+//           //intentionally fails due to difficulty mocking observable
+//
+//           //mocked listener makes sure that the task is called
+//           verify( listeners ).size();
+//           //verifies that the observable made listener iterate.
+//           verify( listeners ).iterator();
+//       }
+//
+//       @Test(timeout=10000)
+//       public void oneListenerRejected()
+//               throws ExecutionException, InterruptedException, ConnectionException {
+//
+//           // create a latch for the event listener, and add it to the list of events
+//
+//           final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
+//
+//           final int sizeToReturn = 1;
+//
+//           final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+//
+//           final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
+//
+//           final Set<EntityVersionCreated> listeners = mock( Set.class );
+//           final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+//
+//           when ( listeners.size()).thenReturn( 1 );
+//           when ( listeners.iterator()).thenReturn( helper );
+//           when ( helper.next() ).thenReturn( eventListener );
+//
+//           final Id applicationId = new SimpleId( "application" );
+//
+//           final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
+//
+//           final Id entityId = new SimpleId( "user" );
+//           final Entity entity = new Entity( entityId );
+//
+//           // start the task
+//
+//           EntityVersionCreatedTask entityVersionCreatedTask =
+//                   new EntityVersionCreatedTask( appScope, listeners, entity);
+//
+//           entityVersionCreatedTask.rejected();
+//
+//           //mocked listener makes sure that the task is called
+//           verify( listeners ).size();
+//           verify( listeners ).iterator();
+//           verify( helper ).next();
+//
+//       }
+//
+//       private static class EntityVersionCreatedTest implements EntityVersionCreated {
+//           final CountDownLatch invocationLatch;
+//
+//           private EntityVersionCreatedTest( final CountDownLatch invocationLatch) {
+//               this.invocationLatch = invocationLatch;
+//           }
+//
+//           @Override
+//           public void versionCreated( final ApplicationScope scope, final Entity entity ) {
+//               invocationLatch.countDown();
+//           }
+//       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
deleted file mode 100644
index 720e602..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.UUID;
-
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.LogEntryMock;
-import org.apache.usergrid.persistence.collection.util.VersionGenerator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Tests iterator paging
- */
-public class LogEntryIteratorTest {
-
-
-    @Test
-    public void empty() throws ConnectionException {
-
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
-                mock( MvccLogEntrySerializationStrategy.class );
-
-        final ApplicationScope scope =
-                new ApplicationScopeImpl( new SimpleId( "application" ));
-
-        final Id entityId = new SimpleId( "entity" );
-
-        final int pageSize = 100;
-
-
-        //set the start version, it should be discarded
-        UUID start = UUIDGenerator.newTimeUUID();
-
-        when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) )
-                .thenReturn( new ArrayList<MvccLogEntry>() );
-
-
-        //now iterate we should get everything
-        LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
-
-
-        assertFalse( itr.hasNext() );
-    }
-
-
-    @Test
-    public void partialLastPage() throws ConnectionException {
-
-
-        final int pageSize = 10;
-        final int totalPages = 3;
-        final int lastPageSize = pageSize / 2;
-
-        //have one half page
-
-        pageElements( pageSize, totalPages, lastPageSize );
-    }
-
-
-    @Test
-    public void emptyLastPage() throws ConnectionException {
-
-
-        final int pageSize = 10;
-        final int totalPages = 3;
-        final int lastPageSize = 0;
-
-        //have one half page
-
-        pageElements( pageSize, totalPages, lastPageSize );
-    }
-
-
-    public void pageElements( final int pageSize, final int totalPages, final int lastPageSize )
-            throws ConnectionException {
-
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
-                mock( MvccLogEntrySerializationStrategy.class );
-
-        final ApplicationScope scope =
-                new ApplicationScopeImpl( new SimpleId( "application" ) );
-
-        final Id entityId = new SimpleId( "entity" );
-
-
-        //have one half page
-        final int toGenerate = pageSize * totalPages + lastPageSize;
-
-
-        final LogEntryMock mockResults =
-                LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) );
-
-        Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
-
-        //this element should be skipped
-        UUID start = expectedEntries.next().getVersion();
-
-        //now iterate we should get everything
-        LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
-
-
-        while ( expectedEntries.hasNext() && itr.hasNext() ) {
-            final MvccLogEntry expected = expectedEntries.next();
-
-            final MvccLogEntry returned = itr.next();
-
-            assertEquals( expected, returned );
-        }
-
-
-        assertFalse( itr.hasNext() );
-        assertFalse( expectedEntries.hasNext() );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
new file mode 100644
index 0000000..c82e1bf
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
@@ -0,0 +1,134 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.collection.util.VersionGenerator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests iterator paging
+ */
+public class MinMaxLogEntryIteratorTest {
+
+
+    @Test
+    public void empty() throws ConnectionException {
+
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final ApplicationScope scope =
+                new ApplicationScopeImpl( new SimpleId( "application" ));
+
+        final Id entityId = new SimpleId( "entity" );
+
+        final int pageSize = 100;
+
+
+        //set the start version, it should be discarded
+        UUID start = UUIDGenerator.newTimeUUID();
+
+        when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) )
+                .thenReturn( new ArrayList<MvccLogEntry>() );
+
+
+        //now iterate we should get everything
+        MinMaxLogEntryIterator
+            itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+        assertFalse( itr.hasNext() );
+    }
+
+
+    @Test
+    public void partialLastPage() throws ConnectionException {
+
+
+        final int pageSize = 10;
+        final int totalPages = 3;
+        final int lastPageSize = pageSize / 2;
+
+        //have one half page
+
+        pageElements( pageSize, totalPages, lastPageSize );
+    }
+
+
+    @Test
+    public void emptyLastPage() throws ConnectionException {
+
+
+        final int pageSize = 10;
+        final int totalPages = 3;
+        final int lastPageSize = 0;
+
+        //have one half page
+
+        pageElements( pageSize, totalPages, lastPageSize );
+    }
+
+
+    public void pageElements( final int pageSize, final int totalPages, final int lastPageSize )
+            throws ConnectionException {
+
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final ApplicationScope scope =
+                new ApplicationScopeImpl( new SimpleId( "application" ) );
+
+        final Id entityId = new SimpleId( "entity" );
+
+
+        //have one half page
+        final int toGenerate = pageSize * totalPages + lastPageSize;
+
+
+        final LogEntryMock mockResults =
+                LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, VersionGenerator.generateVersions( toGenerate ) );
+
+        Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
+
+        //this element should be skipped
+        UUID start = expectedEntries.next().getVersion();
+
+        //now iterate we should get everything
+        MinMaxLogEntryIterator
+            itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+        while ( expectedEntries.hasNext() && itr.hasNext() ) {
+            final MvccLogEntry expected = expectedEntries.next();
+
+            final MvccLogEntry returned = itr.next();
+
+            assertEquals( expected, returned );
+        }
+
+
+        assertFalse( itr.hasNext() );
+        assertFalse( expectedEntries.hasNext() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
index 107b2a0..673903c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -43,6 +44,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.fasterxml.uuid.UUIDComparator;
 import com.google.inject.Inject;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -65,15 +67,15 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
 
     private MvccLogEntrySerializationStrategy logEntryStrategy;
 
+
     @Before
-    public void wireLogEntryStrategy(){
+    public void wireLogEntryStrategy() {
         logEntryStrategy = getLogEntryStrategy();
     }
 
 
     /**
      * Get the log entry strategy from
-     * @return
      */
     protected abstract MvccLogEntrySerializationStrategy getLogEntryStrategy();
 
@@ -181,6 +183,85 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
     }
 
 
+    @Test
+    public void getReversedEntries() throws ConnectionException {
+
+        final Id applicationId = new SimpleId( "application" );
+
+        ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+        final Id id = new SimpleId( "test" );
+
+        int count = 10;
+
+        final UUID[] versions = new UUID[count];
+        final Stage COMPLETE = Stage.COMPLETE;
+        final MvccLogEntry[] entries = new MvccLogEntry[count];
+
+
+        for ( int i = 0; i < count; i++ ) {
+            versions[i] = UUIDGenerator.newTimeUUID();
+
+            entries[i] = new MvccLogEntryImpl( id, versions[i], COMPLETE, MvccLogEntry.State.COMPLETE );
+            logEntryStrategy.write( context, entries[i] ).execute();
+
+            //Read it back
+
+            MvccLogEntry returned =
+                logEntryStrategy.load( context, Collections.singleton( id ), versions[i] ).getMaxVersion( id );
+
+            assertNotNull( "Returned value should not be null", returned );
+
+            assertEquals( "Returned should equal the saved", entries[i], returned );
+        }
+
+
+        final UUID[] assertVersions = Arrays.copyOf( versions, versions.length );
+
+        Arrays.sort( assertVersions, ( v1, v2 ) -> UUIDComparator.staticCompare( v1, v2 ) * -1 );
+
+        //now do a range scan from the end
+
+        final int half = count/2;
+
+        final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half);
+
+        assertEquals( half, results.size() );
+
+        for ( int i = 0; i < count / 2; i++ ) {
+            final MvccLogEntry saved = entries[i];
+            final MvccLogEntry returned = results.get( i );
+
+            assertEquals( "Entry was not equal to the saved value", saved, returned );
+        }
+
+
+        //now get the next batch
+        final List<MvccLogEntry> results2 =
+            logEntryStrategy.loadReversed( context, id, versions[half], count );
+
+        assertEquals( half, results2.size());
+
+        for ( int i = 0; i < half; i++ ) {
+            final MvccLogEntry saved = entries[half + i];
+            final MvccLogEntry returned = results2.get( i );
+
+            assertEquals( "Entry was not equal to the saved value", saved, returned );
+        }
+
+
+        //now delete them all and ensure we get no results back
+        for ( int i = 0; i < count; i++ ) {
+            logEntryStrategy.delete( context, id, versions[i] ).execute();
+        }
+
+        final  List<MvccLogEntry>  results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
+
+        assertEquals( "All log entries were deleted", 0, results3.size() );
+    }
+
+
     @Test( expected = NullPointerException.class )
     public void writeParamsNoContext() throws ConnectionException {
         logEntryStrategy.write( null, mock( MvccLogEntry.class ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
new file mode 100644
index 0000000..c653458
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.executor;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A task executor that allows you to submit tasks
+ */
+public class TaskExecutorFactory {
+
+
+    /**
+     * Create a task executor
+     * @param schedulerName
+     * @param maxThreadCount
+     * @param maxQueueSize
+     * @return
+     */
+    public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
+                                                         final int maxQueueSize ) {
+
+
+        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
+
+
+        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+
+
+        return threadPool;
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ) );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private static final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+        private final String poolName;
+
+
+        private CountingThreadFactory( final String poolName ) {this.poolName = poolName;}
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            final String threadName = poolName + "-" + newValue;
+
+            Thread t = new Thread( r, threadName );
+
+            //set it to be a daemon thread so it doesn't block shutdown
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
index d6cc5e8..f28e190 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
@@ -20,8 +20,6 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
-import org.apache.usergrid.persistence.core.task.Task;
-
 import rx.Scheduler;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
deleted file mode 100644
index 9007167..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-
-/**
- * Implementation of the task executor with a unique name and size
- */
-public class NamedTaskExecutorImpl implements TaskExecutor {
-
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
-
-    private final ListeningExecutorService executorService;
-
-    private final String name;
-    private final int poolSize;
-
-
-    /**
-     * @param name The name of this instance of the task executor
-     * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
-     * @param queueLength The length of tasks to keep in the queue
-     */
-    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
-        Preconditions.checkNotNull( name );
-        Preconditions.checkArgument( name.length() > 0, "name must have a length" );
-        Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" );
-        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
-
-        this.name = name;
-        this.poolSize = poolSize;
-
-        // The user has chosen to disable asynchronous execution, 
-        // to create an executor service that will reject all requests
-        if ( poolSize == 0 ) {
-            executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService());
-        }
-
-        //queue executions as normal
-        else {
-            final BlockingQueue<Runnable> queue = queueLength > 0 
-                ? new ArrayBlockingQueue<Runnable>(queueLength) : new SynchronousQueue<Runnable>();
-
-            executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
-        }
-    }
-
-
-    @Override
-    public <V> ListenableFuture<V> submit( final Task<V> task ) {
-
-        final ListenableFuture<V> future;
-
-        try {
-            future = executorService.submit( task );
-
-            // Log our success or failures for debugging purposes
-            Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
-        }
-        catch ( RejectedExecutionException ree ) {
-            return Futures.immediateFuture( task.rejected());
-        }
-
-        return future;
-    }
-
-
-    @Override
-    public void shutdown() {
-        this.executorService.shutdownNow();
-    }
-
-
-    /**
-     * Callback for when the task succeeds or fails.
-     */
-    private static final class TaskFutureCallBack<V> implements FutureCallback<V> {
-
-        private final Task<V> task;
-
-
-        private TaskFutureCallBack( Task<V> task ) {
-            this.task = task;
-        }
-
-
-        @Override
-        public void onSuccess( @Nullable final V result ) {
-            LOG.debug( "Successfully completed task ", task );
-        }
-
-
-        @Override
-        public void onFailure( final Throwable t ) {
-            LOG.error( "Unable to execute task.  Exception is ", t );
-
-            task.exceptionThrown( t );
-        }
-    }
-
-
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
-            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
-                    new RejectedHandler() );
-        }
-    }
-
-
-    /**
-     * Thread factory that will name and count threads for easier debugging
-     */
-    private final class CountingThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            Thread t = new Thread( r, name + "-" + newValue );
-
-            t.setDaemon( true );
-
-            return t;
-        }
-    }
-
-
-    /**
-     * The handler that will handle rejected executions and signal the interface
-     */
-    private final class RejectedHandler implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
-            throw new RejectedExecutionException( "Unable to run task, queue full" );
-        }
-
-    }
-
-
-    /**
-     * Executor implementation that simply rejects all incoming tasks
-     */
-    private static final class RejectingExecutorService implements ExecutorService{
-
-        @Override
-        public void shutdown() {
-
-        }
-
-
-        @Override
-        public List<Runnable> shutdownNow() {
-            return Collections.EMPTY_LIST;
-        }
-
-
-        @Override
-        public boolean isShutdown() {
-            return false;
-        }
-
-
-        @Override
-        public boolean isTerminated() {
-            return false;
-        }
-
-
-        @Override
-        public boolean awaitTermination( final long timeout, final TimeUnit unit ) 
-            throws InterruptedException {
-            return false;
-        }
-
-
-        @Override
-        public <T> Future<T> submit( final Callable<T> task ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> Future<T> submit( final Runnable task, final T result ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public Future<?> submit( final Runnable task ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks )
-                throws InterruptedException {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, 
-                final long timeout, final TimeUnit unit ) throws InterruptedException {
-
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> T invokeAny( final Collection<? extends Callable<T>> tasks )
-            throws InterruptedException, ExecutionException {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, 
-            final TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-
-
-        @Override
-        public void execute( final Runnable command ) {
-            throw new RejectedExecutionException("No Asynchronous tasks allowed");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
deleted file mode 100644
index 5582161..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.concurrent.Callable;
-
-
-/**
- * The task to execute
- */
-public interface Task<V> extends Callable<V> {
-
-
-    /**
-     * Invoked when this task throws an uncaught exception.
-     * @param throwable
-     */
-    void exceptionThrown(final Throwable throwable);
-
-    /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
-     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
-     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
-     * request and process later (lazy repair for instance ) do so.
-     *
-     */
-    V rejected();
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
deleted file mode 100644
index 5728d2e..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-
-/**
- * An interface for execution of tasks
- */
-public interface  TaskExecutor {
-
-    /**
-     * Submit the task asynchronously
-     * @param task
-     */
-    public <V> ListenableFuture<V> submit( Task<V> task );
-
-    /**
-     * Stop the task executor without waiting for scheduled threads to run
-     */
-    public void shutdown();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
deleted file mode 100644
index 4f95918..0000000
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-
-/**
- * Tests for the namedtask execution impl
- */
-public class NamedTaskExecutorImplTest {
-
-
-    @Test
-    public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( task );
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        assertEquals( 0l, exceptionLatch.getCount() );
-
-        assertEquals( 0l, rejectedLatch.getCount() );
-    }
-
-
-    @Test
-    public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-        final RuntimeException re = new RuntimeException( "throwing exception" );
-
-        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-            @Override
-            public Void call() throws Exception {
-                super.call();
-                throw re;
-            }
-        };
-
-        executor.submit( task );
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-        exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        assertSame( re, task.exceptions.get( 0 ) );
-
-
-        assertEquals( 0l, rejectedLatch.getCount() );
-    }
-
-
-    @Test
-    public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-
-        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-            @Override
-            public Void call() throws Exception {
-                super.call();
-
-                //park this thread so it takes up a task and the next is rejected
-                final Object mutex = new Object();
-
-                synchronized ( mutex ) {
-                    mutex.wait();
-                }
-
-                return null;
-            }
-        };
-
-        executor.submit( task );
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
-        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
-        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( testTask );
-
-
-        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //if we get here we've been rejected, just double check we didn't run
-
-        assertEquals( 1l, secondRunLatch.getCount() );
-        assertEquals( 0l, secondExceptionLatch.getCount() );
-    }
-
-
-    @Test
-    public void noCapacityWithQueue() throws InterruptedException {
-
-        final int threadPoolSize = 1;
-        final int queueSize = 10;
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-        final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-        int iterations = threadPoolSize + queueSize;
-
-        for(int i = 0; i < iterations; i ++) {
-
-            final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-                @Override
-                public Void call() throws Exception {
-                    super.call();
-
-                    //park this thread so it takes up a task and the next is rejected
-                    final Object mutex = new Object();
-
-                    synchronized ( mutex ) {
-                        mutex.wait();
-                    }
-
-                    return null;
-                }
-            };
-            executor.submit( task );
-        }
-
-
-
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
-        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
-        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( testTask );
-
-
-        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //if we get here we've been rejected, just double check we didn't run
-
-        assertEquals( 1l, secondRunLatch.getCount() );
-        assertEquals( 0l, secondExceptionLatch.getCount() );
-    }
-
-
-    @Test
-    public void rejectingTaskExecutor() throws InterruptedException {
-
-        final int threadPoolSize = 0;
-        final int queueSize = 0;
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 1 );
-        final CountDownLatch runLatch = new CountDownLatch( 0 );
-
-
-        //now submit the second task
-
-
-
-        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
-        executor.submit( testTask );
-
-
-        //should be immediately rejected
-        rejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //if we get here we've been rejected, just double check we didn't run
-
-        assertEquals( 0l, exceptionLatch.getCount() );
-        assertEquals( 0l, runLatch.getCount() );
-    }
-
-
-    private static abstract class TestTask<V> implements Task<V> {
-
-        private final List<Throwable> exceptions;
-        private final CountDownLatch exceptionLatch;
-        private final CountDownLatch rejectedLatch;
-        private final CountDownLatch runLatch;
-
-
-        private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
-                          final CountDownLatch runLatch ) {
-            this.exceptionLatch = exceptionLatch;
-            this.rejectedLatch = rejectedLatch;
-            this.runLatch = runLatch;
-
-            this.exceptions = new ArrayList<>();
-        }
-
-
-
-        @Override
-        public void exceptionThrown( final Throwable throwable ) {
-            this.exceptions.add( throwable );
-            exceptionLatch.countDown();
-        }
-
-
-        @Override
-        public V rejected() {
-            rejectedLatch.countDown();
-            return null;
-        }
-
-
-        @Override
-        public V call() throws Exception {
-            runLatch.countDown();
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 4c38c13..987a36c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -64,23 +64,23 @@ public interface GraphManager extends CPManager {
 
 
     /**
-     * @param edge The edge to delete
+     * @param edge Mark the edge as deleted in the graph
      *
      *
      * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version
      */
-    Observable<Edge> deleteEdge( Edge edge );
+    Observable<Edge> markEdge( Edge edge );
 
     /**
      *
-     * Remove the node from the graph.
+     * Mark the node as removed from the graph.
      *
      * @param node The node to remove
      * @param timestamp The timestamp to apply the delete operation.  Any edges connected to this node with a timestmap
      * <= the specified time will be removed from the graph
      * @return
      */
-    Observable<Id> deleteNode(Id node, long timestamp);
+    Observable<Id> markNode( Id node, long timestamp );
 
     /**
      * Get all versions of this edge where versions <= max version

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
index eb49711..d73f767 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -36,7 +36,7 @@ public interface GraphManagerFactory
      *
      * @param collectionScope The context to use when creating the graph manager
      */
-    public GraphManager createEdgeManager( ApplicationScope collectionScope );
+    GraphManager createEdgeManager( ApplicationScope collectionScope );
 
     void invalidate();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
index 114440f..38f62b5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
@@ -59,6 +59,12 @@ public interface SearchByEdge {
     long getMaxTimestamp();
 
     /**
+     * Return true if we should filter edges marked for deletion
+     * @return
+     */
+    boolean filterMarked();
+
+    /**
      * The optional start parameter.  All edges emitted with be > the specified start edge.
      * This is useful for paging.  Simply use the last value returned in the previous call in the start parameter
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
index 5e47ae0..f213b00 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
@@ -65,6 +65,12 @@ public interface SearchByEdgeType {
      */
     Order getOrder();
 
+    /**
+     * Return true to filter marked edges from the results
+     * @return
+     */
+    boolean filterMarked();
+
 
     /**
      * Options for ordering.  By default, we want to perform descending for common use cases and read speed.  This is our our data

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6cdaef0..4b628d1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -28,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
 import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
@@ -196,15 +194,6 @@ public abstract class GraphModule extends AbstractModule {
     }
 
 
-    @Inject
-    @Singleton
-    @Provides
-    @GraphTaskExecutor
-    public TaskExecutor graphTaskExecutor( final GraphFig graphFig ) {
-        return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(),
-                graphFig.getShardAuditWorkerQueueSize() );
-    }
-
 
 
     /**