You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:40:58 UTC

[3/5] incubator-usergrid git commit: First pass at refactoring mark + sweep

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
deleted file mode 100644
index e5c2896..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- */
-public class LogEntryIterator implements Iterator<MvccLogEntry> {
-
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final ApplicationScope scope;
-    private final Id entityId;
-    private final int pageSize;
-
-
-    private Iterator<MvccLogEntry> elementItr;
-    private UUID nextStart;
-
-
-    /**
-     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
-     * @param scope The scope of the entity
-     * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version
-     * < max
-     * @param pageSize The fetch size to get when querying the serialization strategy
-     */
-    public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                             final ApplicationScope scope, final Id entityId, final UUID maxVersion,
-                             final int pageSize ) {
-
-        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
-
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.scope = scope;
-        this.entityId = entityId;
-        this.nextStart = maxVersion;
-        this.pageSize = pageSize;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
-            try {
-                advance();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to query cassandra", e );
-            }
-        }
-
-        return elementItr.hasNext();
-    }
-
-
-    @Override
-    public MvccLogEntry next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more elements exist" );
-        }
-
-        return elementItr.next();
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
-
-    /**
-     * Advance our iterator
-     */
-    public void advance() throws ConnectionException {
-
-        final int requestedSize = pageSize + 1;
-
-        //loop through even entry that's < this one and remove it
-        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
-
-        //we always remove the first version if it's equal since it's returned
-        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
-            results.remove( 0 );
-        }
-
-
-        //we have results, set our next start
-        if ( results.size() == pageSize ) {
-            nextStart = results.get( results.size() - 1 ).getVersion();
-        }
-        //nothing left to do
-        else {
-            nextStart = null;
-        }
-
-        elementItr = results.iterator();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
new file mode 100644
index 0000000..072e0ea
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import rx.Observable;
+import rx.Subscriber;
+
+
+/**
+ * An observable that emits log entries from MIN to MAX
+ */
+public class LogEntryObservable implements Observable.OnSubscribe<MvccLogEntry>{
+
+    private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+
+
+    public LogEntryObservable( final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
+        this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+    }
+
+
+    @Override
+    public void call( final Subscriber<? super MvccLogEntry> subscriber ) {
+
+        subscriber.onStart();
+
+        while(!subscriber.isUnsubscribed()){
+
+
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
new file mode 100644
index 0000000..a8e15a7
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
@@ -0,0 +1,114 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ */
+public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final ApplicationScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+
+
+    /**
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param maxVersion The max version of the entity.  Iterator will iterate from min to min starting with the version
+     * < max
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                   final ApplicationScope scope, final Id entityId, final UUID maxVersion,
+                                   final int pageSize ) {
+
+        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.nextStart = maxVersion;
+        this.pageSize = pageSize;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public MvccLogEntry next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize = pageSize + 1;
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+            results.remove( 0 );
+        }
+
+
+        //we have results, set our next start
+        if ( results.size() == pageSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
index 81c0248..fdef3c3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
@@ -111,6 +111,20 @@ public class MvccLogEntrySerializationProxyImpl implements MvccLogEntrySerializa
 
 
     @Override
+    public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+                                            final UUID minVersion, final int maxSize ) {
+
+        final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
+            return migration.from.loadReversed( applicationScope, entityId, minVersion, maxSize );
+        }
+
+        return migration.to.loadReversed( applicationScope, entityId, minVersion, maxSize );
+    }
+
+
+    @Override
     public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
         final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 76e9dba..73804e4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -31,11 +30,6 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.ReversedType;
-import org.apache.cassandra.db.marshal.UUIDType;
-
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
@@ -43,10 +37,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -126,12 +117,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
 
         //didnt put the max in the error message, I don't want to take the string construction hit every time
         Preconditions.checkArgument( entityIds.size() <= fig.getMaxLoadSize(),
-                "requested size cannot be over configured maximum" );
+            "requested size cannot be over configured maximum" );
 
 
         final Id applicationId = collectionScope.getApplication();
-        final Id ownerId = applicationId;
-
 
 
         final List<ScopedRowKey<K>> rowKeys = new ArrayList<>( entityIds.size() );
@@ -155,7 +144,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         }
         catch ( ConnectionException e ) {
             throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
-                    e );
+                e );
         }
 
 
@@ -181,7 +170,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
             final StageStatus stageStatus = column.getValue( SER );
 
             final MvccLogEntry logEntry =
-                    new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
+                new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
 
 
             versionResults.addEntry( logEntry );
@@ -208,13 +197,41 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
             final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
 
 
+            columns =
+                keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ).withColumnRange( version, null, false, maxSize )
+                        .execute().getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to load log entries", e );
+        }
+
+        return parseResults( columns, entityId );
+    }
+
+
+    @Override
+    public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+                                            final UUID minVersion, final int maxSize ) {
+        ColumnList<UUID> columns;
+        try {
+
+            final Id applicationId = applicationScope.getApplication();
+
+            final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
+
+
             columns = keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey )
-                              .withColumnRange( version, null, false, maxSize ).execute().getResult();
+                              .withColumnRange( minVersion, null, true, maxSize ).execute().getResult();
         }
         catch ( ConnectionException e ) {
             throw new RuntimeException( "Unable to load log entries", e );
         }
 
+        return parseResults( columns, entityId );
+    }
+
+
+    private List<MvccLogEntry> parseResults( final ColumnList<UUID> columns, final Id entityId ) {
 
         List<MvccLogEntry> results = new ArrayList<MvccLogEntry>( columns.size() );
 
@@ -245,8 +262,6 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
     }
 
 
-
-
     /**
      * Simple callback to perform puts and deletes with a common row setup code
      */
@@ -281,12 +296,14 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         return batch;
     }
 
+
     protected abstract MultiTennantColumnFamily<ScopedRowKey<K>, UUID> getColumnFamily();
 
 
-    protected abstract ScopedRowKey<K> createKey(final Id applicationId, final Id entityId);
+    protected abstract ScopedRowKey<K> createKey( final Id applicationId, final Id entityId );
+
+    protected abstract Id getEntityIdFromKey( final ScopedRowKey<K> key );
 
-    protected abstract Id getEntityIdFromKey(final ScopedRowKey<K> key);
 
     /**
      * Internal stage shard
@@ -319,7 +336,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
      */
     private static class StatusCache {
         private Map<Integer, MvccLogEntry.State> values =
-                new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
+            new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
 
 
         private StatusCache() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 3168817..e825bbc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -31,16 +31,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
@@ -76,7 +72,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
-    private final EntityVersionTaskFactory entityVersionCleanupFactory;
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
@@ -85,13 +80,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
-                                        final EntityVersionTaskFactory entityVersionCleanupFactory,
                                         final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
         this.keyspace = keyspace;
         this.allVersions = allVersions;
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
         this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
@@ -162,7 +155,8 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
                         atomicLong.addAndGet( entities.size() );
 
-                        List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
+                        final List<Id> toSaveIds = new ArrayList<>( entities.size() );
+
 
                         for ( EntityToSaveMessage message : entities ) {
                             final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
@@ -187,6 +181,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
                             final UUID version = message.entity.getVersion();
 
+
+                            toSaveIds.add( entityId );
+
                             // re-write the unique
                             // values
                             // but this
@@ -215,7 +212,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
                              * Migrate the log entry to the new format
                              */
                             for(final MvccLogEntry entry: logEntries){
-
                                 final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry );
 
                                 totalBatch.mergeShallow( mb );
@@ -223,24 +219,18 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
 
 
-                            //schedule our cleanup task to clean up all the data
-                            final EntityVersionCleanupTask task = entityVersionCleanupFactory
-                                .getCleanupTask( message.scope, message.entity.getId(), version, false );
 
-                            entityVersionCleanupTasks.add( task );
                         }
 
                         executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
 
                         //now run our cleanup task
 
-                        for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
-                            try {
-                                entityVersionCleanupTask.call();
-                            }
-                            catch ( Exception e ) {
-                                LOGGER.error( "Unable to run cleanup task", e );
-                            }
+                        for ( Id updatedId : toSaveIds ) {
+
+
+
+
                         }
                     } ).subscribeOn( Schedulers.io() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
deleted file mode 100644
index 8c26c5b..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.util.LogEntryMock;
-import org.apache.usergrid.persistence.collection.util.UniqueValueEntryMock;
-import org.apache.usergrid.persistence.collection.util.VersionGenerator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Cleanup task tests
- */
-public class EntityVersionCleanupTaskTest {
-
-    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
-
-
-    @AfterClass
-    public static void shutdown() {
-        taskExecutor.shutdown();
-    }
-
-
-    @Test( timeout = 10000 )
-    public void noListenerOneVersion() throws Exception {
-
-
-        final SerializationFig serializationFig = mock( SerializationFig.class );
-
-        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-
-        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
-        final Keyspace keyspace = mock( Keyspace.class );
-
-        final MutationBatch entityBatch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn(
-                mock( MutationBatch.class ) ) // don't care what happens to this one
-                .thenReturn( entityBatch );
-
-        // intentionally no events
-        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
-        final Id entityId = new SimpleId( "user" );
-
-        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-        // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
-        //get the version we're keeping, it's first in our list
-        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-        //mock up unique version output
-        final UniqueValueEntryMock uniqueValueEntryMock =
-                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
-        EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version, false );
-
-        final MutationBatch newBatch = mock( MutationBatch.class );
-
-
-        // set up returning a mutator
-        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
-        //return a new batch when it's called
-        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
-        cleanupTask.call();
-
-
-        //get the second field, this should be deleted
-        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
-        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
-        //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-    }
-
-
-    /**
-     * Tests the cleanup task on the first version created
-     */
-    @Test( timeout = 10000 )
-    public void noListenerNoVersions() throws Exception {
-
-
-        final SerializationFig serializationFig = mock( SerializationFig.class );
-
-        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-
-        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
-        final Keyspace keyspace = mock( Keyspace.class );
-
-        final MutationBatch entityBatch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn(
-                mock( MutationBatch.class ) ) // don't care what happens to this one
-                .thenReturn( entityBatch );
-
-        // intentionally no events
-        final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
-        final Id entityId = new SimpleId( "user" );
-
-
-        final List<UUID> versions = VersionGenerator.generateVersions( 1 );
-
-        // mock up a single log entry, with no other entries
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
-        //get the version we're keeping, it's first in our list
-        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-        //mock up unique version output
-        final UniqueValueEntryMock uniqueValueEntryMock =
-                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
-        EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId, version, false );
-
-        final MutationBatch newBatch = mock( MutationBatch.class );
-
-
-        // set up returning a mutator
-        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
-        //return a new batch when it's called
-        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
-        cleanupTask.call();
-
-
-        //verify delete was never invoked
-        verify( uvss, never() ).delete( any( ApplicationScope.class ), any( UniqueValue.class ) );
-
-        //verify the delete was never invoked
-        verify( less, never() ).delete( any( ApplicationScope.class ), any( Id.class ), any( UUID.class ) );
-    }
-
-
-    @Test( timeout = 10000 )
-    public void singleListenerSingleVersion() throws Exception {
-
-
-        //create a latch for the event listener, and add it to the list of events
-        final int sizeToReturn = 1;
-
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
-        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
-
-        final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
-        listeners.add( eventListener );
-
-
-        final SerializationFig serializationFig = mock( SerializationFig.class );
-
-        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
-        final Keyspace keyspace = mock( Keyspace.class );
-
-        final MutationBatch entityBatch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn(
-                mock( MutationBatch.class ) ) // don't care what happens to this one
-                .thenReturn( entityBatch );
-
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
-        final Id entityId = new SimpleId( "user" );
-
-
-        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-
-        // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
-        //get the version we're keeping, it's first in our list
-        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-        //mock up unique version output
-        final UniqueValueEntryMock uniqueValueEntryMock =
-                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
-        EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version, false );
-
-        final MutationBatch newBatch = mock( MutationBatch.class );
-
-
-        // set up returning a mutator
-        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
-        //return a new batch when it's called
-        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
-        cleanupTask.call();
-
-
-        //get the second field, this should be deleted
-        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
-        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
-        //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-
-
-        //the latch was executed
-        latch.await();
-    }
-
-
-    @Test//(timeout=10000)
-    public void multipleListenerMultipleVersions() throws Exception {
-
-        final SerializationFig serializationFig = mock( SerializationFig.class );
-
-        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-
-        //create a latch for the event listener, and add it to the list of events
-        final int sizeToReturn = 10;
-
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * 3 );
-
-        final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
-        final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
-        final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
-
-        final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
-        listeners.add( listener1 );
-        listeners.add( listener2 );
-        listeners.add( listener3 );
-
-
-
-
-        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
-        final Keyspace keyspace = mock( Keyspace.class );
-
-        final MutationBatch entityBatch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn(
-                mock( MutationBatch.class ) ) // don't care what happens to this one
-                .thenReturn( entityBatch );
-
-
-
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
-        final Id entityId = new SimpleId( "user" );
-
-        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-
-        // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
-        //get the version we're keeping, it's first in our list
-        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-        //mock up unique version output
-        final UniqueValueEntryMock uniqueValueEntryMock =
-                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
-        EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version, false );
-
-        final MutationBatch newBatch = mock( MutationBatch.class );
-
-
-        // set up returning a mutator
-        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
-        //return a new batch when it's called
-        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
-        cleanupTask.call();
-
-
-        //get the second field, this should be deleted
-        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
-        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
-        //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-
-
-        //the latch was executed
-        latch.await();
-
-        //we deleted the version
-        //verify we deleted everything
-          //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-
-        //the latch was executed
-        latch.await();
-    }
-
-
-    /**
-     * Tests what happens when our listeners are VERY slow
-     */
-//    @Ignore( "Test is a work in progress" )
-    @Test( timeout = 10000 )
-    public void multipleListenerMultipleVersionsNoThreadsToRun()
-            throws ExecutionException, InterruptedException, ConnectionException {
-
-
-        final SerializationFig serializationFig = mock( SerializationFig.class );
-
-        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-
-        //create a latch for the event listener, and add it to the list of events
-        final int sizeToReturn = 10;
-
-
-        final int listenerCount = 5;
-
-        final CountDownLatch latch =
-                new CountDownLatch( sizeToReturn / serializationFig.getBufferSize() * listenerCount );
-        final Semaphore waitSemaphore = new Semaphore( 0 );
-
-
-        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
-        final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
-        final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
-        final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
-        final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
-
-        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
-
-        listeners.add( listener1 );
-        listeners.add( listener2 );
-        listeners.add( listener3 );
-        listeners.add( listener4 );
-        listeners.add( listener5 );
-
-
-        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
-        final Keyspace keyspace = mock( Keyspace.class );
-
-        final MutationBatch entityBatch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn(
-                mock( MutationBatch.class ) ) // don't care what happens to this one
-                .thenReturn( entityBatch );
-
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
-        final Id entityId = new SimpleId( "user" );
-
-
-        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-        // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
-        //get the version we're keeping, it's first in our list
-        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-
-        //mock up unique version output
-        final UniqueValueEntryMock uniqueValueEntryMock =
-                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
-        EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version, false);
-
-        final MutationBatch newBatch = mock( MutationBatch.class );
-
-
-        // set up returning a mutator
-        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
-        //return a new batch when it's called
-        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
-        //start the task
-        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
-
-        /**
-         * While we're not done, release latches every 200 ms
-         */
-        while ( !future.isDone() ) {
-            Thread.sleep( 200 );
-            waitSemaphore.release( listenerCount );
-        }
-
-        //wait for the task
-        future.get();
-
-
-        //get the second field, this should be deleted
-        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
-        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
-        //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-
-
-        //the latch was executed
-        latch.await();
-
-        //we deleted the version
-        //verify we deleted everything
-        //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-
-        //the latch was executed
-        latch.await();
-
-
-        //the latch was executed
-        latch.await();
-    }
-
-
-    /**
-     * Tests that our task will run in the caller if there's no threads, ensures that the task runs
-     */
-    @Test( timeout = 10000 )
-    public void singleListenerSingleVersionRejected()
-            throws ExecutionException, InterruptedException, ConnectionException {
-
-
-
-        //create a latch for the event listener, and add it to the list of events
-        final int sizeToReturn = 1;
-
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
-        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
-
-        final Set<EntityVersionDeleted> listeners = new HashSet<>();
-
-        listeners.add( eventListener );
-
-
-        final SerializationFig serializationFig = mock( SerializationFig.class );
-
-        when( serializationFig.getBufferSize() ).thenReturn( 10 );
-
-        final MvccLogEntrySerializationStrategy less = mock( MvccLogEntrySerializationStrategy.class );
-
-        final UniqueValueSerializationStrategy uvss = mock( UniqueValueSerializationStrategy.class );
-
-        final Keyspace keyspace = mock( Keyspace.class );
-
-        final MutationBatch entityBatch = mock( MutationBatch.class );
-
-        when( keyspace.prepareMutationBatch() ).thenReturn(
-                mock( MutationBatch.class ) ) // don't care what happens to this one
-                .thenReturn( entityBatch );
-
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl( applicationId );
-
-        final Id entityId = new SimpleId( "user" );
-
-
-        final List<UUID> versions = VersionGenerator.generateVersions( 2 );
-
-
-        // mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( less, appScope, entityId, versions );
-
-
-        //get the version we're keeping, it's first in our list
-        final UUID version = logEntryMock.getEntryAtIndex( 0 ).getVersion();
-
-        //mock up unique version output
-        final UniqueValueEntryMock uniqueValueEntryMock =
-                UniqueValueEntryMock.createUniqueMock( uvss, appScope, entityId, versions );
-
-
-        EntityVersionCleanupTask cleanupTask =
-                new EntityVersionCleanupTask( serializationFig, less, uvss, keyspace, listeners, appScope, entityId,
-                        version, false );
-
-        final MutationBatch newBatch = mock( MutationBatch.class );
-
-
-        // set up returning a mutator
-        when( uvss.delete( same( appScope ), any( UniqueValue.class ) ) ).thenReturn( newBatch );
-
-        //return a new batch when it's called
-        when( less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) ).thenReturn( newBatch );
-
-
-        cleanupTask.rejected();
-
-
-        //get the second field, this should be deleted
-        final UniqueValue oldUniqueField = uniqueValueEntryMock.getEntryAtIndex( 1 );
-
-        final MvccLogEntry expectedDeletedEntry = logEntryMock.getEntryAtIndex( 1 );
-
-
-        //verify delete was invoked
-        verify( uvss ).delete( same( appScope ), same( oldUniqueField ) );
-
-        //verify the delete was invoked
-        verify( less ).delete( same( appScope ), same( entityId ), same( expectedDeletedEntry.getVersion() ) );
-
-        // verify it was run
-        verify( entityBatch ).execute();
-
-
-        //the latch was executed
-        latch.await();
-    }
-
-
-    private static class EntityVersionDeletedTest implements EntityVersionDeleted {
-        final CountDownLatch invocationLatch;
-
-
-        private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
-            this.invocationLatch = invocationLatch;
-        }
-
-
-        @Override
-        public void versionDeleted( final ApplicationScope scope, final Id entityId,
-                                    final List<MvccLogEntry> entityVersion ) {
-            invocationLatch.countDown();
-        }
-    }
-
-
-    private static class SlowListener extends EntityVersionDeletedTest {
-        final Semaphore blockLatch;
-
-
-        private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
-            super( invocationLatch );
-            this.blockLatch = blockLatch;
-        }
-
-
-        @Override
-        public void versionDeleted( final ApplicationScope scope, final Id entityId,
-                                    final List<MvccLogEntry> entityVersion ) {
-
-            //wait for unblock to happen before counting down invocation latches
-            try {
-                blockLatch.acquire();
-            }
-            catch ( InterruptedException e ) {
-                throw new RuntimeException( e );
-            }
-            super.versionDeleted( scope, entityId, entityVersion );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
deleted file mode 100644
index e993fad..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Created task tests.
- */
-public class EntityVersionCreatedTaskTest {
-
-    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
-
-    @AfterClass
-    public static void shutdown() {
-        taskExecutor.shutdown();
-    }
-
-
-    @Test(timeout=10000)
-    public void noListener()
-            throws ExecutionException, InterruptedException, ConnectionException {
-
-        // create a latch for the event listener, and add it to the list of events
-
-        final Set<EntityVersionCreated> listeners = mock( Set.class );
-
-        when ( listeners.size()).thenReturn( 0 );
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
-        final Id entityId = new SimpleId( "user" );
-        final Entity entity = new Entity( entityId );
-
-        // start the task
-
-        EntityVersionCreatedTask entityVersionCreatedTask =
-                new EntityVersionCreatedTask( appScope, listeners, entity);
-
-        try {
-            entityVersionCreatedTask.call();
-        }catch(Exception e){
-            Assert.fail(e.getMessage());
-        }
-
-
-        // wait for the task
-       // future.get();
-
-        //mocked listener makes sure that the task is called
-        verify( listeners ).size();
-
-    }
-    @Test(timeout=10000)
-    public void oneListener()
-            throws ExecutionException, InterruptedException, ConnectionException {
-
-        // create a latch for the event listener, and add it to the list of events
-
-        final int sizeToReturn = 1;
-
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
-        final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
-
-        final Set<EntityVersionCreated> listeners = mock( Set.class );
-        final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
-
-        when ( listeners.size()).thenReturn( 1 );
-        when ( listeners.iterator()).thenReturn( helper );
-        when ( helper.next() ).thenReturn( eventListener );
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
-        final Id entityId = new SimpleId( "user" );
-        final Entity entity = new Entity( entityId );
-
-        // start the task
-
-        EntityVersionCreatedTask entityVersionCreatedTask =
-            new EntityVersionCreatedTask( appScope, listeners, entity);
-
-        try {
-            entityVersionCreatedTask.call();
-        }catch(Exception e){
-
-            Assert.fail(e.getMessage());
-        }
-        //mocked listener makes sure that the task is called
-        verify( listeners ).size();
-        verify( listeners ).iterator();
-        verify( helper ).next();
-
-    }
-
-    @Test(timeout=10000)
-    public void multipleListener()
-            throws ExecutionException, InterruptedException, ConnectionException {
-
-        final int sizeToReturn = 3;
-
-        final Set<EntityVersionCreated> listeners = mock( Set.class );
-        final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
-
-        when ( listeners.size()).thenReturn( 3 );
-        when ( listeners.iterator()).thenReturn( helper );
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
-        final Id entityId = new SimpleId( "user" );
-        final Entity entity = new Entity( entityId );
-
-        // start the task
-
-        EntityVersionCreatedTask entityVersionCreatedTask =
-                new EntityVersionCreatedTask( appScope, listeners, entity);
-
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
-        final EntityVersionCreatedTest listener1 = new EntityVersionCreatedTest(latch);
-        final EntityVersionCreatedTest listener2 = new EntityVersionCreatedTest(latch);
-        final EntityVersionCreatedTest listener3 = new EntityVersionCreatedTest(latch);
-
-        when ( helper.next() ).thenReturn( listener1,listener2,listener3);
-
-        try {
-            entityVersionCreatedTask.call();
-        }catch(Exception e){
-            ;
-        }
-        //ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
-
-        //wait for the task
-        //intentionally fails due to difficulty mocking observable
-
-        //mocked listener makes sure that the task is called
-        verify( listeners ).size();
-        //verifies that the observable made listener iterate.
-        verify( listeners ).iterator();
-    }
-
-    @Test(timeout=10000)
-    public void oneListenerRejected()
-            throws ExecutionException, InterruptedException, ConnectionException {
-
-        // create a latch for the event listener, and add it to the list of events
-
-        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
-
-        final int sizeToReturn = 1;
-
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
-
-        final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
-
-        final Set<EntityVersionCreated> listeners = mock( Set.class );
-        final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
-
-        when ( listeners.size()).thenReturn( 1 );
-        when ( listeners.iterator()).thenReturn( helper );
-        when ( helper.next() ).thenReturn( eventListener );
-
-        final Id applicationId = new SimpleId( "application" );
-
-        final ApplicationScope appScope = new ApplicationScopeImpl(applicationId);
-
-        final Id entityId = new SimpleId( "user" );
-        final Entity entity = new Entity( entityId );
-
-        // start the task
-
-        EntityVersionCreatedTask entityVersionCreatedTask =
-                new EntityVersionCreatedTask( appScope, listeners, entity);
-
-        entityVersionCreatedTask.rejected();
-
-        //mocked listener makes sure that the task is called
-        verify( listeners ).size();
-        verify( listeners ).iterator();
-        verify( helper ).next();
-
-    }
-
-    private static class EntityVersionCreatedTest implements EntityVersionCreated {
-        final CountDownLatch invocationLatch;
-
-        private EntityVersionCreatedTest( final CountDownLatch invocationLatch) {
-            this.invocationLatch = invocationLatch;
-        }
-
-        @Override
-        public void versionCreated( final ApplicationScope scope, final Entity entity ) {
-            invocationLatch.countDown();
-        }
-    }
-}