You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/10 12:59:36 UTC

[5/8] incubator-usergrid git commit: First pass at refactoring mark + sweep

First pass at refactoring mark + sweep


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3e2afe23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3e2afe23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3e2afe23

Branch: refs/heads/USERGRID-614
Commit: 3e2afe23bb14761b395acbe1b6577ef024b0c3dd
Parents: 36b5bad
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 5 17:30:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 04:59:16 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../corepersistence/CpRelationManager.java      |   4 +-
 .../migration/AppInfoMigrationPlugin.java       |   3 +-
 .../collection/EntityCollectionManager.java     |  20 +-
 .../cache/CachedEntityCollectionManager.java    |  18 +-
 .../collection/event/EntityDeleted.java         |  45 --
 .../collection/event/EntityVersionCreated.java  |  39 -
 .../collection/event/EntityVersionDeleted.java  |  46 --
 .../collection/guice/CollectionModule.java      |  39 -
 .../EntityCollectionManagerFactoryImpl.java     |  31 +-
 .../impl/EntityCollectionManagerImpl.java       | 386 ++++------
 .../collection/impl/EntityDeletedTask.java      | 137 ----
 .../impl/EntityVersionCleanupTask.java          | 247 -------
 .../impl/EntityVersionCreatedTask.java          | 115 ---
 .../impl/EntityVersionTaskFactory.java          |  60 --
 .../mvcc/stage/delete/UniqueCleanup.java        | 133 ++++
 .../mvcc/stage/delete/VersionCompact.java       | 120 ++++
 .../MvccLogEntrySerializationStrategy.java      |  14 +
 .../serialization/SerializationFig.java         |  30 +-
 .../serialization/impl/LogEntryIterator.java    | 114 ---
 .../impl/MinMaxLogEntryIterator.java            | 121 ++++
 .../MvccLogEntrySerializationProxyImpl.java     |  14 +
 .../MvccLogEntrySerializationStrategyImpl.java  |  83 ++-
 .../migration/MvccEntityDataMigrationImpl.java  |  30 +-
 .../collection/EntityCollectionManagerIT.java   | 215 ++++--
 .../impl/EntityVersionCleanupTaskTest.java      | 715 -------------------
 .../impl/EntityVersionCreatedTaskTest.java      | 241 -------
 .../mvcc/stage/delete/UniqueCleanupTest.java    | 712 ++++++++++++++++++
 .../mvcc/stage/delete/VersionCompactTest.java   | 238 ++++++
 .../impl/LogEntryIteratorTest.java              | 132 ----
 .../impl/MinMaxLogEntryIteratorTest.java        | 131 ++++
 ...ccLogEntrySerializationStrategyImplTest.java | 132 +++-
 .../collection/util/LogEntryMock.java           | 103 +--
 .../src/test/resources/log4j.properties         |   1 +
 .../core/executor/TaskExecutorFactory.java      |  95 +++
 .../persistence/core/rx/RxTaskScheduler.java    |   2 -
 .../core/task/NamedTaskExecutorImpl.java        | 286 --------
 .../usergrid/persistence/core/task/Task.java    |  48 --
 .../persistence/core/task/TaskExecutor.java     |  41 --
 .../core/task/NamedTaskExecutorImplTest.java    | 271 -------
 .../usergrid/persistence/graph/GraphFig.java    |  26 +-
 .../persistence/graph/GraphManager.java         |   8 +-
 .../persistence/graph/GraphManagerFactory.java  |   2 +-
 .../persistence/graph/SearchByEdge.java         |   6 +
 .../persistence/graph/SearchByEdgeType.java     |   6 +
 .../persistence/graph/guice/GraphModule.java    |  11 -
 .../graph/impl/GraphManagerImpl.java            | 404 ++++-------
 .../graph/impl/SimpleSearchByEdge.java          |  40 +-
 .../graph/impl/SimpleSearchByEdgeType.java      |  29 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  90 ++-
 .../graph/CommittedGraphManagerIT.java          |   8 +-
 .../persistence/graph/GraphManagerIT.java       |  26 +-
 .../graph/StorageGraphManagerIT.java            |   8 +-
 .../impl/shard/ShardGroupCompactionTest.java    |   6 +-
 55 files changed, 2464 insertions(+), 3422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 460fc11..8bcc73a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -664,7 +664,7 @@ public class CpEntityManager implements EntityManager {
             //delete it asynchronously
             indexService.queueEntityDelete( applicationScope, entityId );
 
-            return ecm.delete( entityId );
+            return ecm.mark( entityId );
         }
         else {
             return Observable.empty();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 6c375ef..e7ad682 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -369,7 +369,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
         final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
         final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
-        final Observable deleteNodeGraph = managementGraphManager.deleteNode(applicationId, Long.MAX_VALUE);
+        final Observable deleteNodeGraph = managementGraphManager.markNode( applicationId, Long.MAX_VALUE );
         final Observable deleteAppFromIndex = aei.deleteApplication();
 
         return Observable.concat(copyConnections, deleteNodeGraph, deleteAppFromIndex)

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6adeefc..df3fa82 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
 
         //run our delete
         final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() );
-        gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
+        gm.markEdge( collectionToItemEdge ).toBlocking().last();
 
 
         /**
@@ -782,7 +782,7 @@ public class CpRelationManager implements RelationManager {
 
         //delete all the edges
         final Edge lastEdge =
-            gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking()
+            gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking()
               .lastOrDefault( null );
 
         if ( lastEdge != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
index 30955af..97b87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
-import rx.functions.Func1;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -200,7 +199,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
         final ApplicationScope systemAppScope = getApplicationScope(CpNamingUtils.SYSTEM_APP_ID );
         final EntityCollectionManager systemCollectionManager =
             entityCollectionManagerFactory.createCollectionManager( systemAppScope );
-        systemCollectionManager.delete(new SimpleId(uuid, "appinfos")).toBlocking().last();
+        systemCollectionManager.mark( new SimpleId( uuid, "appinfos" ) ).toBlocking().last();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 5a329e3..9de8f41 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -46,11 +46,12 @@ public interface EntityCollectionManager {
 
 
     /**
-     * @param entityId MarkCommit the entity as deleted
+     * @param entityId MarkCommit the entity as deleted.  Will not actually remove it from cassandra.  This operation will
+     * also remove all unique properties for this entity
      *
      * @return The observable of the id after the operation has completed
      */
-    Observable<Id> delete( Id entityId );
+    Observable<Id> mark( Id entityId );
 
     /**
      * @param entityId The entity id to load.
@@ -96,6 +97,21 @@ public interface EntityCollectionManager {
      */
     Observable<EntitySet> load( Collection<Id> entityIds );
 
+    /**
+     * Get all versions of the log entry, from Max to min
+     * @param entityId
+     * @return An observable stream of mvccLog entries
+     */
+    Observable<MvccLogEntry> getVersions(final Id entityId);
+
+    /**
+     * Delete these versions from cassandra.  Must be atomic so that read log entries are only removed.  Entity data
+     * and log entry will be deleted
+     * @param entries
+     * @return Any observable of all successfully compacted log entries
+     */
+    Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries );
+
 
     /**
      * Returns health of entity data store.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index fa35580..7a04b8d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.FieldSet;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -84,8 +85,8 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
 
 
     @Override
-    public Observable<Id> delete( final Id entityId ) {
-        return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() {
+    public Observable<Id> mark( final Id entityId ) {
+        return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id id ) {
                 entityCache.invalidate( id );
@@ -125,6 +126,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
         return targetEntityCollectionManager.load( entityIds );
     }
 
+
+    @Override
+    public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+        return targetEntityCollectionManager.getVersions( entityId );
+    }
+
+
+    @Override
+    public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+        return targetEntityCollectionManager.delete( entries );
+    }
+
+
     @Override
     public Health getHealth() {
         return targetEntityCollectionManager.getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
deleted file mode 100644
index 0e9b62e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity is deleted.  The delete log entry is not removed until all instances of this listener has completed.
- * If any listener fails with an exception, the entity will not be removed.
- *
- */
-public interface EntityDeleted {
-
-
-    /**
-     * The event fired when an entity is deleted
-     *
-     * @param scope The scope of the entity
-     * @param entityId The id of the entity
-     * @param version the entity version
-     */
-    public void deleted( final ApplicationScope scope, final Id entityId, final UUID version);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
deleted file mode 100644
index 7f1be1a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Invoked after a new version of an entity has been created.
- * The entity should be a complete view of the entity.
- */
-public interface EntityVersionCreated {
-
-    /**
-     * The new version of the entity. Note that this should be a fully merged view of the entity.
-     * In the case of partial updates, the passed entity should be fully merged with it's previous
-     * entries.
-     * @param scope The scope of the entity
-     * @param entity The fully loaded and merged entity
-     */
-    public void versionCreated( final ApplicationScope scope, final Entity entity );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
deleted file mode 100644
index 7fd8fe7..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity version is removed.  Note that this is not a deletion of the entity
- * itself, only the version itself.
- *
- */
-public interface EntityVersionDeleted {
-
-    /**
-     * The version specified was removed.
-     *
-     * @param scope The scope of the entity
-     * @param entityId The entity Id that was removed
-     * @param entityVersions The versions that are to be removed
-     */
-    public void versionDeleted(final ApplicationScope scope, final Id entityId,
-            final List<MvccLogEntry> entityVersions);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index b256a44..78c7f37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -22,27 +22,14 @@ import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.multibindings.Multibinder;
 
 
 /**
@@ -61,15 +48,7 @@ public abstract class CollectionModule extends AbstractModule {
         install( new SerializationModule() );
         install( new ServiceModule() );
 
-        install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class ));
-
         // users of this module can add their own implemementations
-        // for more information: https://github.com/google/guice/wiki/Multibindings
-
-        Multibinder.newSetBinder( binder(), EntityVersionDeleted.class );
-        Multibinder.newSetBinder( binder(), EntityVersionCreated.class );
-        Multibinder.newSetBinder( binder(), EntityDeleted.class );
-
         // create a guice factor for getting our collection manager
          bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
 
@@ -81,24 +60,6 @@ public abstract class CollectionModule extends AbstractModule {
         configureMigrationProvider();
 
     }
-//
-//    @Provides
-//    @Singleton
-//    @Inject
-//    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
-//        final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
-//
-//        return writeStart;
-//    }
-
-    @Inject
-    @Singleton
-    @Provides
-    @CollectionTaskExecutor
-    public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks",
-                serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
-    }
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 761c4b5..50a4bfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -27,9 +27,10 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -37,11 +38,11 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -67,12 +68,13 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final RollbackAction rollback;
     private final MarkStart markStart;
     private final MarkCommit markCommit;
+    private final UniqueCleanup uniqueCleanup;
+    private final VersionCompact versionCompact;
+    private final SerializationFig serializationFig;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
-    private final EntityVersionTaskFactory entityVersionTaskFactory;
-    private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
     private final RxTaskScheduler rxTaskScheduler;
@@ -84,10 +86,11 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                   //create the target EM that will perform logic
                             final EntityCollectionManager target = new EntityCollectionManagerImpl(
                                 writeStart, writeVerifyUnique,
-                                writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
+                                writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,  uniqueCleanup, versionCompact,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory,
-                                rxTaskScheduler );
+                                mvccLogEntrySerializationStrategy, keyspace,
+                                metricsFactory, serializationFig,
+                                rxTaskScheduler, scope );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@ -102,13 +105,12 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final WriteOptimisticVerify writeOptimisticVerify,
                                                final WriteCommit writeCommit, final RollbackAction rollback,
                                                final MarkStart markStart, final MarkCommit markCommit,
-                                               final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                               final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
+                                               final SerializationFig serializationFig, final
+                                                   MvccEntitySerializationStrategy entitySerializationStrategy,
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-                                               final Keyspace keyspace,
-                                               final EntityVersionTaskFactory entityVersionTaskFactory,
-                                               @CollectionTaskExecutor final TaskExecutor taskExecutor, final
-                                                   EntityCacheFig entityCacheFig,
+                                               final Keyspace keyspace, final EntityCacheFig entityCacheFig,
                                                MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
 
         this.writeStart = writeStart;
@@ -118,12 +120,13 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.rollback = rollback;
         this.markStart = markStart;
         this.markCommit = markCommit;
+        this.uniqueCleanup = uniqueCleanup;
+        this.versionCompact = versionCompact;
+        this.serializationFig = serializationFig;
         this.entitySerializationStrategy = entitySerializationStrategy;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
         this.keyspace = keyspace;
-        this.entityVersionTaskFactory = entityVersionTaskFactory;
-        this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;
         this.rxTaskScheduler = rxTaskScheduler;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 6f10e86..7a32d72 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.collection.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -32,11 +33,13 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.FieldSet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -44,15 +47,17 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator;
 import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -60,7 +65,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -73,13 +77,9 @@ import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
 
-import rx.Notification;
 import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 
 /**
@@ -97,6 +97,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final WriteOptimisticVerify writeOptimisticVerify;
     private final WriteCommit writeCommit;
     private final RollbackAction rollback;
+    private final UniqueCleanup uniqueCleanup;
+    private final VersionCompact versionCompact;
 
 
     //delete stages
@@ -107,41 +109,39 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
 
-    private final EntityVersionTaskFactory entityVersionTaskFactory;
-    private final TaskExecutor taskExecutor;
+    private final SerializationFig serializationFig;
 
-    private final RxTaskScheduler rxTaskScheduler;
 
     private final Keyspace keyspace;
     private final Timer writeTimer;
-    private final Meter writeMeter;
     private final Timer deleteTimer;
+    private final Timer fieldIdTimer;
+    private final Timer fieldEntityTimer;
     private final Timer updateTimer;
     private final Timer loadTimer;
     private final Timer getLatestTimer;
-    private final Meter deleteMeter;
-    private final Meter getLatestMeter;
-    private final Meter loadMeter;
-    private final Meter updateMeter;
 
     private final ApplicationScope applicationScope;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     @Inject
     public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique,
-                                        final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit
-                                                writeCommit,
-                                        final RollbackAction rollback, final MarkStart markStart,
-                                        final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                        final WriteOptimisticVerify writeOptimisticVerify,
+                                        final WriteCommit writeCommit, final RollbackAction rollback,
+                                        final MarkStart markStart, final MarkCommit markCommit,
+                                        final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
+                                        final MvccEntitySerializationStrategy entitySerializationStrategy,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-                                        final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory,
-                                        @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope,
-                                        final MetricsFactory metricsFactory,
-
-                                        final RxTaskScheduler rxTaskScheduler ) {
+                                        final Keyspace keyspace, final MetricsFactory metricsFactory,
+                                        final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler,
+                                        @Assisted final ApplicationScope applicationScope ) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.uniqueCleanup = uniqueCleanup;
+        this.versionCompact = versionCompact;
+        this.serializationFig = serializationFig;
         this.rxTaskScheduler = rxTaskScheduler;
 
         ValidationUtils.validateApplicationScope( applicationScope );
@@ -158,21 +158,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.keyspace = keyspace;
 
-        this.entityVersionTaskFactory = entityVersionTaskFactory;
-        this.taskExecutor = taskExecutor;
 
         this.applicationScope = applicationScope;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
-        this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer");
-        this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter");
-        this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer");
-        this.deleteMeter= metricsFactory.getMeter(EntityCollectionManagerImpl.class, "delete.meter");
-        this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "update.timer");
-        this.updateMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "update.meter");
-        this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer");
-        this.loadMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "load.meter");
-        this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer");
-        this.getLatestMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "latest.meter");
+        this.writeTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "write" );
+        this.deleteTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "delete" );
+        this.fieldIdTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldId" );
+        this.fieldEntityTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldEntity" );
+        this.updateTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "update" );
+        this.loadTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "load" );
+        this.getLatestTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "latest" );
     }
 
 
@@ -192,74 +187,26 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
-        // execute all validation stages concurrently.  Needs refactored when this is done.
-        // https://github.com/Netflix/RxJava/issues/627
-        // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
-        //                  writeVerifyUnique, writeOptimisticVerify );
 
-        final Timer.Context timer = writeTimer.time();
-        return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
-            @Override
-            public void call(final Entity entity) {
-                //TODO fire the created task first then the entityVersioncleanup
-                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( applicationScope, entity ));
-                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( applicationScope, entityId,
-                    entity.getVersion(), false ));
-                //post-processing to come later. leave it empty for now.
-            }
-        }).doOnError( rollback )
-            .doOnEach( new Action1<Notification<? super Entity>>() {
-                @Override
-                public void call( Notification<? super Entity> notification ) {
-                    writeMeter.mark();
-                }
-            } )
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        final Observable<Entity> write = observable.map( writeCommit );
+
+        return ObservableTimer.time( write, writeTimer );
     }
 
 
     @Override
-    public Observable<Id> delete( final Id entityId ) {
+    public Observable<Id> mark( final Id entityId ) {
 
         Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
-        final Timer.Context timer = deleteTimer.time();
-        Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) )
-            .map(markStart)
-            .doOnNext( markCommit )
-            .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
-
-                     @Override
-                     public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
-                         MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
-                         Task<Void> task = entityVersionTaskFactory
-                             .getDeleteTask( applicationScope, entity.getId(), entity.getVersion() );
-                         taskExecutor.submit(task);
-                         return entity.getId();
-                     }
-                 }
-            )
-            .doOnNext(new Action1<Id>() {
-                @Override
-                public void call(Id id) {
-                    deleteMeter.mark();
-                }
-            })
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart )
+                                     .doOnNext( markCommit ).compose( uniqueCleanup ).map(
+                entityEvent -> entityEvent.getEvent().getId() );
+
 
-        return o;
+        return ObservableTimer.time( o, deleteTimer );
     }
 
 
@@ -270,35 +217,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
 
-        final Timer.Context timer = loadTimer.time();
-        return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() {
-            @Override
-            public Observable<Entity> call(final EntitySet entitySet) {
-                final MvccEntity entity = entitySet.getEntity(entityId);
-
-                if (entity == null || !entity.getEntity().isPresent()) {
-                    return Observable.empty();
-                }
+        final Observable<Entity> entityObservable = load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
+            final MvccEntity entity = entitySet.getEntity( entityId );
 
-                return Observable.just( entity.getEntity().get() );
+            if ( entity == null || !entity.getEntity().isPresent() ) {
+                return Observable.empty();
             }
-        })
-            .doOnNext( new Action1<Entity>() {
-                @Override
-                public void call( Entity entity ) {
-                    loadMeter.mark();
-                }
-            } )
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
-    }
 
+            return Observable.just( entity.getEntity().get() );
+        } );
 
 
+        return ObservableTimer.time( entityObservable, loadTimer );
+    }
 
 
     @Override
@@ -306,15 +237,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
 
-        final Timer.Context timer = loadTimer.time();
-
-        return Observable.create( new Observable.OnSubscribe<EntitySet>() {
+        final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() {
 
             @Override
             public void call( final Subscriber<? super EntitySet> subscriber ) {
                 try {
                     final EntitySet results =
-                            entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+                        entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
 
                     subscriber.onNext( results );
                     subscriber.onCompleted();
@@ -323,162 +252,167 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } )
-            .doOnNext(new Action1<EntitySet>() {
-                @Override
-                public void call(EntitySet entitySet) {
-                    loadMeter.mark();
-                }
-            })
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        } );
+
+
+        return ObservableTimer.time( entitySetObservable, loadTimer );
     }
 
 
     @Override
-    public Observable<Id> getIdField(final String type,  final Field field ) {
-        final List<Field> fields = Collections.singletonList( field );
-        return rx.Observable.from( fields ).map( new Func1<Field, Id>() {
+    public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+        ValidationUtils.verifyIdentity( entityId );
+
+        return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
             @Override
-            public Id call( Field field ) {
-                try {
-                    final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
-                    final UniqueValue value = set.getValue( field.getName() );
-                    return value == null ? null : value.getEntityId();
-                }
-                catch ( ConnectionException e ) {
-                    logger.error( "Failed to getIdField", e );
-                    throw new RuntimeException( e );
-                }
+            protected Iterator<MvccLogEntry> getIterator() {
+                return new MinMaxLogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId,
+                    serializationFig.getBufferSize() );
             }
         } );
     }
 
 
+    @Override
+    public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+        Preconditions.checkNotNull( entries, "entries must not be null" );
+
+
+        return Observable.from( entries ).map( logEntry -> new CollectionIoEvent<>( applicationScope, logEntry ) )
+                         .compose( versionCompact ).map( event -> event.getEvent() );
+    }
+
+
+    @Override
+    public Observable<Id> getIdField( final String type, final Field field ) {
+        final List<Field> fields = Collections.singletonList( field );
+        final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
+            try {
+                final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
+                final UniqueValue value = set.getValue( field1.getName() );
+                return value == null ? null : value.getEntityId();
+            }
+            catch ( ConnectionException e ) {
+                logger.error( "Failed to getIdField", e );
+                throw new RuntimeException( e );
+            }
+        } );
+
+        return ObservableTimer.time( idObservable, fieldIdTimer );
+    }
+
+
     /**
      * Retrieves all entities that correspond to each field given in the Collection.
-     * @param fields
-     * @return
      */
     @Override
-    public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields ) {
-        return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
-            @Override
-            public FieldSet call( Collection<Field> fields ) {
-                try {
-
-                    final UUID startTime = UUIDGenerator.newTimeUUID();
+    public Observable<FieldSet> getEntitiesFromFields( final String type, final Collection<Field> fields ) {
+        final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> {
+            try {
 
-                    //Get back set of unique values that correspond to collection of fields
-                    UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope,type,  fields );
+                final UUID startTime = UUIDGenerator.newTimeUUID();
 
-                    //Short circut if we don't have any uniqueValues from the given fields.
-                    if(!set.iterator().hasNext()){
-                        return new MutableFieldSet( 0 );
-                    }
+                //Get back set of unique values that correspond to collection of fields
+                UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 );
 
+                //Short circuit if we don't have any uniqueValues from the given fields.
+                if ( !set.iterator().hasNext() ) {
+                    return new MutableFieldSet( 0 );
+                }
 
-                    //loop through each field, and construct an entity load
-                    List<Id> entityIds = new ArrayList<>(fields.size());
-                    List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
 
-                    for(final Field expectedField: fields) {
+                //loop through each field, and construct an entity load
+                List<Id> entityIds = new ArrayList<>( fields1.size() );
+                List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() );
 
-                        UniqueValue value = set.getValue(expectedField.getName());
+                for ( final Field expectedField : fields1 ) {
 
-                        if(value ==null){
-                            logger.debug( "Field does not correspond to a unique value" );
-                        }
+                    UniqueValue value = set.getValue( expectedField.getName() );
 
-                        entityIds.add(value.getEntityId());
-                        uniqueValues.add(value);
+                    if ( value == null ) {
+                        logger.debug( "Field does not correspond to a unique value" );
                     }
 
-                    //Load a entity for each entityId we retrieved.
-                    final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, entityIds, startTime);
-
-                    //now loop through and ensure the entities are there.
-                    final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
-
-                    final MutableFieldSet response = new MutableFieldSet(fields.size());
+                    entityIds.add( value.getEntityId() );
+                    uniqueValues.add( value );
+                }
 
-                    for(final UniqueValue expectedUnique: uniqueValues) {
-                        final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+                //Load a entity for each entityId we retrieved.
+                final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
 
-                        //bad unique value, delete this, it's inconsistent
-                        if(entity == null || !entity.getEntity().isPresent()){
-                            final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(applicationScope, expectedUnique);
-                            deleteBatch.mergeShallow(valueDelete);
-                            continue;
-                        }
+                //now loop through and ensure the entities are there.
+                final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
 
+                final MutableFieldSet response = new MutableFieldSet( fields1.size() );
 
-                        //else add it to our result set
-                        response.addEntity(expectedUnique.getField(),entity);
+                for ( final UniqueValue expectedUnique : uniqueValues ) {
+                    final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() );
 
+                    //bad unique value, delete this, it's inconsistent
+                    if ( entity == null || !entity.getEntity().isPresent() ) {
+                        final MutationBatch valueDelete =
+                            uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
+                        deleteBatch.mergeShallow( valueDelete );
+                        continue;
                     }
 
-                    //TODO: explore making this an Async process
-                    //We'll repair it again if we have to
-                    deleteBatch.execute();
 
-                    return response;
+                    //else add it to our result set
+                    response.addEntity( expectedUnique.getField(), entity );
+                }
 
+                //TODO: explore making this an Async process
+                //We'll repair it again if we have to
+                deleteBatch.execute();
 
-                }
-                catch ( ConnectionException e ) {
-                    logger.error( "Failed to getIdField", e );
-                    throw new RuntimeException( e );
-                }
+                return response;
+            }
+            catch ( ConnectionException e ) {
+                logger.error( "Failed to getIdField", e );
+                throw new RuntimeException( e );
             }
         } );
-    }
 
 
+        return ObservableTimer.time( fieldSetObservable, fieldEntityTimer );
+    }
 
 
     // fire the stages
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {
 
-        return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-
-                    @Override
-                    public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+        return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
 
-                        Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
-                                          .doOnNext( writeVerifyUnique );
+            Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
+                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+                          .doOnNext( writeVerifyUnique );
 
 
-                        // optimistic verification
-                        Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
-                                          .doOnNext( writeOptimisticVerify );
+            // optimistic verification
+            Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
+                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+                          .doOnNext( writeOptimisticVerify );
 
+            final Observable<CollectionIoEvent<MvccEntity>> zip =
+                Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
 
-                        //wait for both to finish
-                        Observable.merge( unique, optimistic ).toBlocking().last();
-                    }
-                } );
+            return zip;
+        } );
     }
 
 
     @Override
     public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
 
-        final Timer.Context timer = getLatestTimer.time();
-        return Observable.create( new Observable.OnSubscribe<VersionSet>() {
+
+        final Observable<VersionSet> observable =  Observable.create( new Observable.OnSubscribe<VersionSet>() {
 
             @Override
             public void call( final Subscriber<? super VersionSet> subscriber ) {
                 try {
                     final VersionSet logEntries = mvccLogEntrySerializationStrategy
-                            .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+                        .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
 
                     subscriber.onNext( logEntries );
                     subscriber.onCompleted();
@@ -487,13 +421,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } )
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        } );
+
+        return ObservableTimer.time( observable, getLatestTimer );
     }
 
 
@@ -502,11 +432,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         try {
             ColumnFamily<String, String> CF_SYSTEM_LOCAL =
-                    new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
-                            StringSerializer.get() );
+                new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
+                    StringSerializer.get() );
 
             OperationResult<CqlResult<String, String>> result =
-                    keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
+                keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
 
             if ( result.getResult().getRows().size() == 1 ) {
                 return Health.GREEN;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
deleted file mode 100644
index 1753d26..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires Cleanup Task
- */
-public class EntityDeletedTask implements Task<Void> {
-    private static final Logger LOG =  LoggerFactory.getLogger(EntityDeletedTask.class);
-
-    private final EntityVersionTaskFactory entityVersionTaskFactory;
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-    private final Set<EntityDeleted> listeners;
-    private final ApplicationScope collectionScope;
-    private final Id entityId;
-    private final UUID version;
-
-
-    @Inject
-    public EntityDeletedTask(
-        EntityVersionTaskFactory entityVersionTaskFactory,
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-        final MvccEntitySerializationStrategy entitySerializationStrategy,
-        final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
-        @Assisted final ApplicationScope  collectionScope,
-        @Assisted final Id                      entityId,
-        @Assisted final UUID                    version) {
-
-        this.entityVersionTaskFactory = entityVersionTaskFactory;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-        this.listeners = listeners;
-        this.collectionScope = collectionScope;
-        this.entityId = entityId;
-        this.version = version;
-    }
-
-
-    @Override
-    public void exceptionThrown(Throwable throwable) {
-        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
-                new Object[] { collectionScope, entityId, version }, throwable );
-    }
-
-
-    @Override
-    public Void rejected() {
-        try {
-            call();
-        }
-        catch ( Exception e ) {
-            throw new RuntimeException( "Exception thrown in call task", e );
-        }
-
-        return null;
-    }
-
-
-    @Override
-    public Void call() throws Exception {
-
-        entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call();
-
-        fireEvents();
-        final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
-        final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
-
-        entityDelete.execute();
-        logDelete.execute();
-//
-        return null;
-    }
-
-
-    private void fireEvents() {
-        final int listenerSize = listeners.size();
-
-        if ( listenerSize == 0 ) {
-            return;
-        }
-
-        if ( listenerSize == 1 ) {
-            listeners.iterator().next().deleted( collectionScope, entityId,version );
-            return;
-        }
-
-        LOG.debug( "Started firing {} listeners", listenerSize );
-
-        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
-        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
-            listener.deleted( collectionScope, entityId, version );
-        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-        LOG.debug( "Finished firing {} listeners", listenerSize );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
deleted file mode 100644
index b5f9085..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.BlockingObservable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
- * retained, the range is exclusive.
- */
-public class EntityVersionCleanupTask implements Task<Void> {
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
-
-    private final Set<EntityVersionDeleted> listeners;
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
-    private final Keyspace keyspace;
-
-    private final SerializationFig serializationFig;
-
-    private final ApplicationScope scope;
-    private final Id entityId;
-    private final UUID version;
-    private final boolean includeVersion;
-
-
-    @Inject
-    public EntityVersionCleanupTask(
-        final SerializationFig serializationFig,
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-        final UniqueValueSerializationStrategy  uniqueValueSerializationStrategy,
-        final Keyspace                          keyspace,
-        final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
-        @Assisted final ApplicationScope scope,
-        @Assisted final Id                      entityId,
-        @Assisted final UUID                    version,
-        @Assisted final boolean includeVersion) {
-
-        this.serializationFig = serializationFig;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
-        this.keyspace = keyspace;
-        this.listeners = listeners;
-        this.scope = scope;
-        this.entityId = entityId;
-        this.version = version;
-
-        this.includeVersion = includeVersion;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-        logger.error( "Unable to run update task for collection {} with entity {} and version {}",
-                new Object[] { scope, entityId, version }, throwable );
-    }
-
-
-    @Override
-    public Void rejected() {
-        //Our task was rejected meaning our queue was full.  We need this operation to run,
-        // so we'll run it in our current thread
-        try {
-            call();
-        }
-        catch ( Exception e ) {
-            throw new RuntimeException( "Exception thrown in call task", e );
-        }
-
-        return null;
-    }
-
-
-    @Override
-    public Void call() throws Exception {
-        //TODO Refactor this logic into a a class that can be invoked from anywhere
-       //iterate all unique values
-        final BlockingObservable<Long> uniqueValueCleanup =
-                Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
-                    @Override
-                    protected Iterator<UniqueValue> getIterator() {
-                        return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
-                    }
-                } )
-
-                        //skip current versions
-                        .skipWhile( new Func1<UniqueValue, Boolean>() {
-                            @Override
-                            public Boolean call( final UniqueValue uniqueValue ) {
-                                return !includeVersion && version.equals( uniqueValue.getEntityVersion() );
-                            }
-                        } )
-                                //buffer our buffer size, then roll them all up in a single batch mutation
-                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
-                    @Override
-                    public void call( final List<UniqueValue> uniqueValues ) {
-                        final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                        for ( UniqueValue value : uniqueValues ) {
-                            uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
-                        }
-
-                        try {
-                            uniqueCleanupBatch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute batch mutation", e );
-                        }
-                    }
-                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
-
-        //start calling the listeners for remove log entries
-        BlockingObservable<Long> versionsDeletedObservable =
-
-                Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
-                    @Override
-                    protected Iterator<MvccLogEntry> getIterator() {
-
-                        return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
-                                serializationFig.getBufferSize() );
-                    }
-                } )
-                        //skip current version
-                        .skipWhile( new Func1<MvccLogEntry, Boolean>() {
-                            @Override
-                            public Boolean call( final MvccLogEntry mvccLogEntry ) {
-                                return !includeVersion && version.equals( mvccLogEntry.getVersion() );
-                            }
-                        } )
-                                //buffer them for efficiency
-                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
-                    @Override
-                    public void call( final List<MvccLogEntry> mvccEntities ) {
-
-                        fireEvents( mvccEntities );
-
-                        final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                        for ( MvccLogEntry entry : mvccEntities ) {
-                            logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
-                        }
-
-                        try {
-                            logCleanupBatch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute batch mutation", e );
-                        }
-                    }
-                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
-        //wait or this to complete
-        final Long removedCount = uniqueValueCleanup.last();
-
-        logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
-
-        final Long versionCleanupCount = versionsDeletedObservable.last();
-
-        logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
-
-        return null;
-    }
-
-
-    private void fireEvents( final List<MvccLogEntry> versions ) {
-
-        final int listenerSize = listeners.size();
-
-        if ( listenerSize == 0 ) {
-            return;
-        }
-
-        if ( listenerSize == 1 ) {
-            listeners.iterator().next().versionDeleted( scope, entityId, versions );
-            return;
-        }
-
-        logger.debug( "Started firing {} listeners", listenerSize );
-
-        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-
-
-        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
-        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
-            listener.versionDeleted( scope, entityId, versions );
-        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
-
-        logger.debug( "Finished firing {} listeners", listenerSize );
-    }
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
deleted file mode 100644
index fbbcdbd..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires events so that all EntityVersionCreated handlers area called.
- */
-public class EntityVersionCreatedTask implements Task<Void> {
-    private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedTask.class );
-
-    private Set<EntityVersionCreated> listeners;
-    private final ApplicationScope collectionScope;
-    private final Entity entity;
-
-
-    @Inject
-    public EntityVersionCreatedTask( @Assisted final ApplicationScope collectionScope,
-                                     final Set<EntityVersionCreated> listeners,
-                                     @Assisted final Entity entity ) {
-
-        this.listeners = listeners;
-        this.collectionScope = collectionScope;
-        this.entity = entity;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-        logger.error( "Unable to run update task for collection {} with entity {} and version {}",
-                new Object[] { collectionScope, entity}, throwable );
-    }
-
-
-    @Override
-    public Void rejected() {
-
-        // Our task was rejected meaning our queue was full.
-        // We need this operation to run, so we'll run it in our current thread
-        try {
-            call();
-        }
-        catch ( Exception e ) {
-            throw new RuntimeException( "Exception thrown in call task", e );
-        }
-
-        return null;
-    }
-
-
-    @Override
-    public Void call() throws Exception {
-
-        fireEvents();
-        return null;
-    }
-
-
-    private void fireEvents() {
-
-        final int listenerSize = listeners.size();
-
-        if ( listenerSize == 0 ) {
-            return;
-        }
-
-        if ( listenerSize == 1 ) {
-            listeners.iterator().next().versionCreated( collectionScope, entity );
-            return;
-        }
-
-        logger.debug( "Started firing {} listeners", listenerSize );
-
-
-        Observable.from( listeners )
-                  .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
-                      listener.versionCreated( collectionScope, entity );
-                  } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
-        logger.debug( "Finished firing {} listeners", listenerSize );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
deleted file mode 100644
index 51a4607..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface EntityVersionTaskFactory {
-
-    /**
-     * Get a task for cleaning up latent entity data.  If includeVersion = true, the passed version will be cleaned up as well
-     * Otherwise this is a V-1 operation
-     *
-     * @param scope
-     * @param entityId
-     * @param version
-     * @param includeVersion
-     * @return
-     */
-    EntityVersionCleanupTask getCleanupTask( final ApplicationScope scope, final Id entityId, final UUID version,
-                                             final boolean includeVersion );
-
-    /**
-     * Get an entityVersionCreatedTask
-     * @param scope
-     * @param entity
-     * @return
-     */
-    EntityVersionCreatedTask getCreatedTask( final ApplicationScope scope, final Entity entity );
-
-    /**
-     * Get an entity deleted task
-     * @param collectionScope
-     * @param entityId
-     * @param version
-     * @return
-     */
-    EntityDeletedTask getDeleteTask( final ApplicationScope collectionScope, final Id entityId, final UUID version );
-
-}