You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:40:59 UTC

[4/5] incubator-usergrid git commit: First pass at refactoring mark + sweep

First pass at refactoring mark + sweep


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45aed6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45aed6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45aed6cc

Branch: refs/heads/USERGRID-614
Commit: 45aed6ccd5b459aa6c52ad10f1f7f04e1d5cb1f5
Parents: 36b5bad
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 5 17:30:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 5 17:30:08 2015 -0600

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |  14 +
 .../cache/CachedEntityCollectionManager.java    |  14 +
 .../collection/event/EntityDeleted.java         |  45 --
 .../collection/event/EntityVersionCreated.java  |  39 -
 .../collection/event/EntityVersionDeleted.java  |  46 --
 .../collection/guice/CollectionModule.java      |  39 -
 .../EntityCollectionManagerFactoryImpl.java     |  13 +-
 .../impl/EntityCollectionManagerImpl.java       | 352 ++++-----
 .../collection/impl/EntityDeletedTask.java      | 137 ----
 .../impl/EntityVersionCleanupTask.java          | 247 -------
 .../impl/EntityVersionCreatedTask.java          | 115 ---
 .../impl/EntityVersionTaskFactory.java          |  60 --
 .../mvcc/stage/delete/UniqueCleanup.java        | 133 ++++
 .../mvcc/stage/delete/VersionCompact.java       | 120 ++++
 .../MvccLogEntrySerializationStrategy.java      |  14 +
 .../serialization/impl/LogEntryIterator.java    | 114 ---
 .../serialization/impl/LogEntryObservable.java  |  54 ++
 .../impl/MinMaxLogEntryIterator.java            | 114 +++
 .../MvccLogEntrySerializationProxyImpl.java     |  14 +
 .../MvccLogEntrySerializationStrategyImpl.java  |  57 +-
 .../migration/MvccEntityDataMigrationImpl.java  |  30 +-
 .../impl/EntityVersionCleanupTaskTest.java      | 715 -------------------
 .../impl/EntityVersionCreatedTaskTest.java      | 241 -------
 .../mvcc/stage/delete/UniqueCleanupTest.java    | 712 ++++++++++++++++++
 .../mvcc/stage/delete/VersionCompactTest.java   | 238 ++++++
 .../impl/LogEntryIteratorTest.java              | 132 ----
 .../impl/MinMaxLogEntryIteratorTest.java        | 134 ++++
 ...ccLogEntrySerializationStrategyImplTest.java |  85 ++-
 .../core/executor/TaskExecutorFactory.java      |  95 +++
 .../persistence/core/rx/RxTaskScheduler.java    |   2 -
 .../core/task/NamedTaskExecutorImpl.java        | 286 --------
 .../usergrid/persistence/core/task/Task.java    |  48 --
 .../persistence/core/task/TaskExecutor.java     |  41 --
 .../core/task/NamedTaskExecutorImplTest.java    | 271 -------
 .../persistence/graph/GraphManager.java         |   8 +-
 .../persistence/graph/GraphManagerFactory.java  |   2 +-
 .../persistence/graph/SearchByEdge.java         |   6 +
 .../persistence/graph/SearchByEdgeType.java     |   6 +
 .../persistence/graph/guice/GraphModule.java    |  11 -
 .../graph/impl/GraphManagerImpl.java            | 404 ++++-------
 .../graph/impl/SimpleSearchByEdge.java          |  40 +-
 .../graph/impl/SimpleSearchByEdgeType.java      |  29 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  90 ++-
 .../graph/CommittedGraphManagerIT.java          |   8 +-
 .../persistence/graph/GraphManagerIT.java       |  26 +-
 .../graph/StorageGraphManagerIT.java            |   8 +-
 .../impl/shard/ShardGroupCompactionTest.java    |   6 +-
 47 files changed, 2196 insertions(+), 3219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 5a329e3..8c27825 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -96,6 +96,20 @@ public interface EntityCollectionManager {
      */
     Observable<EntitySet> load( Collection<Id> entityIds );
 
+    /**
+     * Get all versions of the log entry, from Max to min
+     * @param entityId
+     * @return An observable stream of mvccLog entries
+     */
+    Observable<MvccLogEntry> getVersions(final Id entityId);
+
+    /**
+     * Remove these versions.  Must be atomic so that read log entries are removed
+     * @param entries
+     * @return Any observable of all successfully compacted log entries
+     */
+    Observable<MvccLogEntry> compact(final Collection<MvccLogEntry> entries);
+
 
     /**
      * Returns health of entity data store.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index fa35580..9412516 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.FieldSet;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -125,6 +126,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
         return targetEntityCollectionManager.load( entityIds );
     }
 
+
+    @Override
+    public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+        return null;
+    }
+
+
+    @Override
+    public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
+        return targetEntityCollectionManager.compact( entries );
+    }
+
+
     @Override
     public Health getHealth() {
         return targetEntityCollectionManager.getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
deleted file mode 100644
index 0e9b62e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity is deleted.  The delete log entry is not removed until all instances of this listener has completed.
- * If any listener fails with an exception, the entity will not be removed.
- *
- */
-public interface EntityDeleted {
-
-
-    /**
-     * The event fired when an entity is deleted
-     *
-     * @param scope The scope of the entity
-     * @param entityId The id of the entity
-     * @param version the entity version
-     */
-    public void deleted( final ApplicationScope scope, final Id entityId, final UUID version);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
deleted file mode 100644
index 7f1be1a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Invoked after a new version of an entity has been created.
- * The entity should be a complete view of the entity.
- */
-public interface EntityVersionCreated {
-
-    /**
-     * The new version of the entity. Note that this should be a fully merged view of the entity.
-     * In the case of partial updates, the passed entity should be fully merged with it's previous
-     * entries.
-     * @param scope The scope of the entity
-     * @param entity The fully loaded and merged entity
-     */
-    public void versionCreated( final ApplicationScope scope, final Entity entity );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
deleted file mode 100644
index 7fd8fe7..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity version is removed.  Note that this is not a deletion of the entity
- * itself, only the version itself.
- *
- */
-public interface EntityVersionDeleted {
-
-    /**
-     * The version specified was removed.
-     *
-     * @param scope The scope of the entity
-     * @param entityId The entity Id that was removed
-     * @param entityVersions The versions that are to be removed
-     */
-    public void versionDeleted(final ApplicationScope scope, final Id entityId,
-            final List<MvccLogEntry> entityVersions);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index b256a44..78c7f37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -22,27 +22,14 @@ import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.multibindings.Multibinder;
 
 
 /**
@@ -61,15 +48,7 @@ public abstract class CollectionModule extends AbstractModule {
         install( new SerializationModule() );
         install( new ServiceModule() );
 
-        install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class ));
-
         // users of this module can add their own implemementations
-        // for more information: https://github.com/google/guice/wiki/Multibindings
-
-        Multibinder.newSetBinder( binder(), EntityVersionDeleted.class );
-        Multibinder.newSetBinder( binder(), EntityVersionCreated.class );
-        Multibinder.newSetBinder( binder(), EntityDeleted.class );
-
         // create a guice factor for getting our collection manager
          bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
 
@@ -81,24 +60,6 @@ public abstract class CollectionModule extends AbstractModule {
         configureMigrationProvider();
 
     }
-//
-//    @Provides
-//    @Singleton
-//    @Inject
-//    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
-//        final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
-//
-//        return writeStart;
-//    }
-
-    @Inject
-    @Singleton
-    @Provides
-    @CollectionTaskExecutor
-    public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks",
-                serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
-    }
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 761c4b5..c4422f7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -27,7 +27,6 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
@@ -41,7 +40,6 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -71,8 +69,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
-    private final EntityVersionTaskFactory entityVersionTaskFactory;
-    private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
     private final RxTaskScheduler rxTaskScheduler;
@@ -86,7 +82,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                 writeStart, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory,
+                                mvccLogEntrySerializationStrategy, keyspace, scope, metricsFactory,
                                 rxTaskScheduler );
 
 
@@ -105,10 +101,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final MvccEntitySerializationStrategy entitySerializationStrategy,
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-                                               final Keyspace keyspace,
-                                               final EntityVersionTaskFactory entityVersionTaskFactory,
-                                               @CollectionTaskExecutor final TaskExecutor taskExecutor, final
-                                                   EntityCacheFig entityCacheFig,
+                                               final Keyspace keyspace, final EntityCacheFig entityCacheFig,
                                                MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
 
         this.writeStart = writeStart;
@@ -122,8 +115,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
         this.keyspace = keyspace;
-        this.entityVersionTaskFactory = entityVersionTaskFactory;
-        this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;
         this.rxTaskScheduler = rxTaskScheduler;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 6f10e86..83c2035 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.FieldSet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
@@ -49,10 +49,9 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -60,7 +59,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -73,13 +71,9 @@ import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.CqlResult;
 import com.netflix.astyanax.serializers.StringSerializer;
 
-import rx.Notification;
 import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 
 /**
@@ -107,36 +101,31 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
 
-    private final EntityVersionTaskFactory entityVersionTaskFactory;
-    private final TaskExecutor taskExecutor;
 
     private final RxTaskScheduler rxTaskScheduler;
 
     private final Keyspace keyspace;
     private final Timer writeTimer;
-    private final Meter writeMeter;
     private final Timer deleteTimer;
+    private final Timer fieldIdTimer;
+    private final Timer fieldEntityTimer;
     private final Timer updateTimer;
     private final Timer loadTimer;
     private final Timer getLatestTimer;
-    private final Meter deleteMeter;
-    private final Meter getLatestMeter;
-    private final Meter loadMeter;
-    private final Meter updateMeter;
 
     private final ApplicationScope applicationScope;
 
 
+
     @Inject
     public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique,
-                                        final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit
-                                                writeCommit,
-                                        final RollbackAction rollback, final MarkStart markStart,
-                                        final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                        final WriteOptimisticVerify writeOptimisticVerify,
+                                        final WriteCommit writeCommit, final RollbackAction rollback,
+                                        final MarkStart markStart, final MarkCommit markCommit,
+                                        final MvccEntitySerializationStrategy entitySerializationStrategy,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-                                        final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory,
-                                        @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope,
+                                        final Keyspace keyspace, @Assisted final ApplicationScope applicationScope,
                                         final MetricsFactory metricsFactory,
 
                                         final RxTaskScheduler rxTaskScheduler ) {
@@ -158,21 +147,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.keyspace = keyspace;
 
-        this.entityVersionTaskFactory = entityVersionTaskFactory;
-        this.taskExecutor = taskExecutor;
 
         this.applicationScope = applicationScope;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
-        this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer");
-        this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter");
-        this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer");
-        this.deleteMeter= metricsFactory.getMeter(EntityCollectionManagerImpl.class, "delete.meter");
-        this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "update.timer");
-        this.updateMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "update.meter");
-        this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer");
-        this.loadMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "load.meter");
-        this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer");
-        this.getLatestMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "latest.meter");
+        this.writeTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "write" );
+        this.deleteTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "delete" );
+        this.fieldIdTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldId" );
+        this.fieldEntityTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldEntity" );
+        this.updateTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "update" );
+        this.loadTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "load" );
+        this.getLatestTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "latest" );
     }
 
 
@@ -192,34 +176,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
-        // execute all validation stages concurrently.  Needs refactored when this is done.
-        // https://github.com/Netflix/RxJava/issues/627
-        // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
-        //                  writeVerifyUnique, writeOptimisticVerify );
 
-        final Timer.Context timer = writeTimer.time();
-        return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
-            @Override
-            public void call(final Entity entity) {
-                //TODO fire the created task first then the entityVersioncleanup
-                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( applicationScope, entity ));
-                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( applicationScope, entityId,
-                    entity.getVersion(), false ));
-                //post-processing to come later. leave it empty for now.
-            }
-        }).doOnError( rollback )
-            .doOnEach( new Action1<Notification<? super Entity>>() {
-                @Override
-                public void call( Notification<? super Entity> notification ) {
-                    writeMeter.mark();
-                }
-            } )
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        final Observable<Entity> write = observable.map( writeCommit );
+
+        return ObservableTimer.time( write, writeTimer );
     }
 
 
@@ -230,36 +190,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
-        final Timer.Context timer = deleteTimer.time();
-        Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) )
-            .map(markStart)
-            .doOnNext( markCommit )
-            .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
-
-                     @Override
-                     public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
-                         MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
-                         Task<Void> task = entityVersionTaskFactory
-                             .getDeleteTask( applicationScope, entity.getId(), entity.getVersion() );
-                         taskExecutor.submit(task);
-                         return entity.getId();
-                     }
-                 }
-            )
-            .doOnNext(new Action1<Id>() {
-                @Override
-                public void call(Id id) {
-                    deleteMeter.mark();
-                }
-            })
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ).map( markStart )
+                                     .doOnNext( markCommit ).map( entityEvent -> entityEvent.getEvent().getId() );
 
-        return o;
+
+        return ObservableTimer.time( o, deleteTimer );
     }
 
 
@@ -270,35 +205,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
 
-        final Timer.Context timer = loadTimer.time();
-        return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() {
-            @Override
-            public Observable<Entity> call(final EntitySet entitySet) {
-                final MvccEntity entity = entitySet.getEntity(entityId);
-
-                if (entity == null || !entity.getEntity().isPresent()) {
-                    return Observable.empty();
-                }
+        final Observable<Entity> entityObservable=  load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
+            final MvccEntity entity = entitySet.getEntity( entityId );
 
-                return Observable.just( entity.getEntity().get() );
+            if ( entity == null || !entity.getEntity().isPresent() ) {
+                return Observable.empty();
             }
-        })
-            .doOnNext( new Action1<Entity>() {
-                @Override
-                public void call( Entity entity ) {
-                    loadMeter.mark();
-                }
-            } )
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
-    }
 
+            return Observable.just( entity.getEntity().get() );
+        } );
 
 
+        return ObservableTimer.time( entityObservable, loadTimer );
+    }
 
 
     @Override
@@ -306,15 +225,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
 
-        final Timer.Context timer = loadTimer.time();
-
-        return Observable.create( new Observable.OnSubscribe<EntitySet>() {
+        final Observable<EntitySet> entitySetObservable =  Observable.create( new Observable.OnSubscribe<EntitySet>() {
 
             @Override
             public void call( final Subscriber<? super EntitySet> subscriber ) {
                 try {
                     final EntitySet results =
-                            entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+                        entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
 
                     subscriber.onNext( results );
                     subscriber.onCompleted();
@@ -323,148 +240,144 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } )
-            .doOnNext(new Action1<EntitySet>() {
-                @Override
-                public void call(EntitySet entitySet) {
-                    loadMeter.mark();
-                }
-            })
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        } );
+
+
+        return ObservableTimer.time( entitySetObservable, loadTimer );
     }
 
 
     @Override
-    public Observable<Id> getIdField(final String type,  final Field field ) {
+    public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+//        mvccLogEntrySerializationStrategy.load(  )
+        return null;
+    }
+
+
+    @Override
+    public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
+        return null;
+    }
+
+
+    @Override
+    public Observable<Id> getIdField( final String type, final Field field ) {
         final List<Field> fields = Collections.singletonList( field );
-        return rx.Observable.from( fields ).map( new Func1<Field, Id>() {
-            @Override
-            public Id call( Field field ) {
-                try {
-                    final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
-                    final UniqueValue value = set.getValue( field.getName() );
-                    return value == null ? null : value.getEntityId();
-                }
-                catch ( ConnectionException e ) {
-                    logger.error( "Failed to getIdField", e );
-                    throw new RuntimeException( e );
-                }
+        final Observable<Id> idObservable =  Observable.from( fields ).map( field1 -> {
+            try {
+                final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
+                final UniqueValue value = set.getValue( field1.getName() );
+                return value == null ? null : value.getEntityId();
+            }
+            catch ( ConnectionException e ) {
+                logger.error( "Failed to getIdField", e );
+                throw new RuntimeException( e );
             }
         } );
+
+        return ObservableTimer.time( idObservable, fieldIdTimer );
     }
 
 
     /**
      * Retrieves all entities that correspond to each field given in the Collection.
-     * @param fields
-     * @return
      */
     @Override
-    public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields ) {
-        return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
-            @Override
-            public FieldSet call( Collection<Field> fields ) {
-                try {
+    public Observable<FieldSet> getEntitiesFromFields( final String type, final Collection<Field> fields ) {
+        final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> {
+            try {
 
-                    final UUID startTime = UUIDGenerator.newTimeUUID();
+                final UUID startTime = UUIDGenerator.newTimeUUID();
 
-                    //Get back set of unique values that correspond to collection of fields
-                    UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope,type,  fields );
-
-                    //Short circut if we don't have any uniqueValues from the given fields.
-                    if(!set.iterator().hasNext()){
-                        return new MutableFieldSet( 0 );
-                    }
+                //Get back set of unique values that correspond to collection of fields
+                UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 );
 
+                //Short circuit if we don't have any uniqueValues from the given fields.
+                if ( !set.iterator().hasNext() ) {
+                    return new MutableFieldSet( 0 );
+                }
 
-                    //loop through each field, and construct an entity load
-                    List<Id> entityIds = new ArrayList<>(fields.size());
-                    List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
 
-                    for(final Field expectedField: fields) {
+                //loop through each field, and construct an entity load
+                List<Id> entityIds = new ArrayList<>( fields1.size() );
+                List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() );
 
-                        UniqueValue value = set.getValue(expectedField.getName());
+                for ( final Field expectedField : fields1 ) {
 
-                        if(value ==null){
-                            logger.debug( "Field does not correspond to a unique value" );
-                        }
+                    UniqueValue value = set.getValue( expectedField.getName() );
 
-                        entityIds.add(value.getEntityId());
-                        uniqueValues.add(value);
+                    if ( value == null ) {
+                        logger.debug( "Field does not correspond to a unique value" );
                     }
 
-                    //Load a entity for each entityId we retrieved.
-                    final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, entityIds, startTime);
-
-                    //now loop through and ensure the entities are there.
-                    final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
-
-                    final MutableFieldSet response = new MutableFieldSet(fields.size());
+                    entityIds.add( value.getEntityId() );
+                    uniqueValues.add( value );
+                }
 
-                    for(final UniqueValue expectedUnique: uniqueValues) {
-                        final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+                //Load a entity for each entityId we retrieved.
+                final EntitySet entitySet =
+                    entitySerializationStrategy.load( applicationScope, entityIds, startTime );
 
-                        //bad unique value, delete this, it's inconsistent
-                        if(entity == null || !entity.getEntity().isPresent()){
-                            final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(applicationScope, expectedUnique);
-                            deleteBatch.mergeShallow(valueDelete);
-                            continue;
-                        }
+                //now loop through and ensure the entities are there.
+                final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
 
+                final MutableFieldSet response = new MutableFieldSet( fields1.size() );
 
-                        //else add it to our result set
-                        response.addEntity(expectedUnique.getField(),entity);
+                for ( final UniqueValue expectedUnique : uniqueValues ) {
+                    final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() );
 
+                    //bad unique value, delete this, it's inconsistent
+                    if ( entity == null || !entity.getEntity().isPresent() ) {
+                        final MutationBatch valueDelete =
+                            uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
+                        deleteBatch.mergeShallow( valueDelete );
+                        continue;
                     }
 
-                    //TODO: explore making this an Async process
-                    //We'll repair it again if we have to
-                    deleteBatch.execute();
 
-                    return response;
+                    //else add it to our result set
+                    response.addEntity( expectedUnique.getField(), entity );
+                }
 
+                //TODO: explore making this an Async process
+                //We'll repair it again if we have to
+                deleteBatch.execute();
 
-                }
-                catch ( ConnectionException e ) {
-                    logger.error( "Failed to getIdField", e );
-                    throw new RuntimeException( e );
-                }
+                return response;
+            }
+            catch ( ConnectionException e ) {
+                logger.error( "Failed to getIdField", e );
+                throw new RuntimeException( e );
             }
         } );
-    }
 
 
+        return ObservableTimer.time( fieldSetObservable, fieldEntityTimer );
+    }
 
 
     // fire the stages
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {
 
-        return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
+        return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
 
-                    @Override
-                    public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+            Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
+                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+                          .doOnNext( writeVerifyUnique );
 
-                        Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
-                                          .doOnNext( writeVerifyUnique );
 
+            // optimistic verification
+            Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
+                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+                          .doOnNext( writeOptimisticVerify );
 
-                        // optimistic verification
-                        Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
-                                          .doOnNext( writeOptimisticVerify );
+            final Observable<CollectionIoEvent<MvccEntity>> zip =  Observable.zip( uniqueObservable, optimisticObservable,
+                ( unique, optimistic ) -> optimistic );
 
+            return zip;
 
-                        //wait for both to finish
-                        Observable.merge( unique, optimistic ).toBlocking().last();
-                    }
-                } );
+        } );
     }
 
 
@@ -478,7 +391,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
             public void call( final Subscriber<? super VersionSet> subscriber ) {
                 try {
                     final VersionSet logEntries = mvccLogEntrySerializationStrategy
-                            .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+                        .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
 
                     subscriber.onNext( logEntries );
                     subscriber.onCompleted();
@@ -487,13 +400,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } )
-            .doOnCompleted( new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            } );
+        } ).doOnCompleted( new Action0() {
+            @Override
+            public void call() {
+                timer.stop();
+            }
+        } );
     }
 
 
@@ -502,11 +414,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         try {
             ColumnFamily<String, String> CF_SYSTEM_LOCAL =
-                    new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
-                            StringSerializer.get() );
+                new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
+                    StringSerializer.get() );
 
             OperationResult<CqlResult<String, String>> result =
-                    keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
+                keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
 
             if ( result.getResult().getRows().size() == 1 ) {
                 return Health.GREEN;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
deleted file mode 100644
index 1753d26..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires Cleanup Task
- */
-public class EntityDeletedTask implements Task<Void> {
-    private static final Logger LOG =  LoggerFactory.getLogger(EntityDeletedTask.class);
-
-    private final EntityVersionTaskFactory entityVersionTaskFactory;
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-    private final Set<EntityDeleted> listeners;
-    private final ApplicationScope collectionScope;
-    private final Id entityId;
-    private final UUID version;
-
-
-    @Inject
-    public EntityDeletedTask(
-        EntityVersionTaskFactory entityVersionTaskFactory,
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-        final MvccEntitySerializationStrategy entitySerializationStrategy,
-        final Set<EntityDeleted>                listeners, // MUST be a set or Guice will not inject
-        @Assisted final ApplicationScope  collectionScope,
-        @Assisted final Id                      entityId,
-        @Assisted final UUID                    version) {
-
-        this.entityVersionTaskFactory = entityVersionTaskFactory;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-        this.listeners = listeners;
-        this.collectionScope = collectionScope;
-        this.entityId = entityId;
-        this.version = version;
-    }
-
-
-    @Override
-    public void exceptionThrown(Throwable throwable) {
-        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
-                new Object[] { collectionScope, entityId, version }, throwable );
-    }
-
-
-    @Override
-    public Void rejected() {
-        try {
-            call();
-        }
-        catch ( Exception e ) {
-            throw new RuntimeException( "Exception thrown in call task", e );
-        }
-
-        return null;
-    }
-
-
-    @Override
-    public Void call() throws Exception {
-
-        entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call();
-
-        fireEvents();
-        final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
-        final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
-
-        entityDelete.execute();
-        logDelete.execute();
-//
-        return null;
-    }
-
-
-    private void fireEvents() {
-        final int listenerSize = listeners.size();
-
-        if ( listenerSize == 0 ) {
-            return;
-        }
-
-        if ( listenerSize == 1 ) {
-            listeners.iterator().next().deleted( collectionScope, entityId,version );
-            return;
-        }
-
-        LOG.debug( "Started firing {} listeners", listenerSize );
-
-        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
-        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
-            listener.deleted( collectionScope, entityId, version );
-        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-        LOG.debug( "Finished firing {} listeners", listenerSize );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
deleted file mode 100644
index b5f9085..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.BlockingObservable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
- * retained, the range is exclusive.
- */
-public class EntityVersionCleanupTask implements Task<Void> {
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
-
-    private final Set<EntityVersionDeleted> listeners;
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
-    private final Keyspace keyspace;
-
-    private final SerializationFig serializationFig;
-
-    private final ApplicationScope scope;
-    private final Id entityId;
-    private final UUID version;
-    private final boolean includeVersion;
-
-
-    @Inject
-    public EntityVersionCleanupTask(
-        final SerializationFig serializationFig,
-        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-        final UniqueValueSerializationStrategy  uniqueValueSerializationStrategy,
-        final Keyspace                          keyspace,
-        final Set<EntityVersionDeleted>         listeners, // MUST be a set or Guice will not inject
-        @Assisted final ApplicationScope scope,
-        @Assisted final Id                      entityId,
-        @Assisted final UUID                    version,
-        @Assisted final boolean includeVersion) {
-
-        this.serializationFig = serializationFig;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
-        this.keyspace = keyspace;
-        this.listeners = listeners;
-        this.scope = scope;
-        this.entityId = entityId;
-        this.version = version;
-
-        this.includeVersion = includeVersion;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-        logger.error( "Unable to run update task for collection {} with entity {} and version {}",
-                new Object[] { scope, entityId, version }, throwable );
-    }
-
-
-    @Override
-    public Void rejected() {
-        //Our task was rejected meaning our queue was full.  We need this operation to run,
-        // so we'll run it in our current thread
-        try {
-            call();
-        }
-        catch ( Exception e ) {
-            throw new RuntimeException( "Exception thrown in call task", e );
-        }
-
-        return null;
-    }
-
-
-    @Override
-    public Void call() throws Exception {
-        //TODO Refactor this logic into a a class that can be invoked from anywhere
-       //iterate all unique values
-        final BlockingObservable<Long> uniqueValueCleanup =
-                Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
-                    @Override
-                    protected Iterator<UniqueValue> getIterator() {
-                        return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
-                    }
-                } )
-
-                        //skip current versions
-                        .skipWhile( new Func1<UniqueValue, Boolean>() {
-                            @Override
-                            public Boolean call( final UniqueValue uniqueValue ) {
-                                return !includeVersion && version.equals( uniqueValue.getEntityVersion() );
-                            }
-                        } )
-                                //buffer our buffer size, then roll them all up in a single batch mutation
-                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
-                    @Override
-                    public void call( final List<UniqueValue> uniqueValues ) {
-                        final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                        for ( UniqueValue value : uniqueValues ) {
-                            uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
-                        }
-
-                        try {
-                            uniqueCleanupBatch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute batch mutation", e );
-                        }
-                    }
-                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
-
-        //start calling the listeners for remove log entries
-        BlockingObservable<Long> versionsDeletedObservable =
-
-                Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
-                    @Override
-                    protected Iterator<MvccLogEntry> getIterator() {
-
-                        return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
-                                serializationFig.getBufferSize() );
-                    }
-                } )
-                        //skip current version
-                        .skipWhile( new Func1<MvccLogEntry, Boolean>() {
-                            @Override
-                            public Boolean call( final MvccLogEntry mvccLogEntry ) {
-                                return !includeVersion && version.equals( mvccLogEntry.getVersion() );
-                            }
-                        } )
-                                //buffer them for efficiency
-                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
-                    @Override
-                    public void call( final List<MvccLogEntry> mvccEntities ) {
-
-                        fireEvents( mvccEntities );
-
-                        final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                        for ( MvccLogEntry entry : mvccEntities ) {
-                            logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
-                        }
-
-                        try {
-                            logCleanupBatch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute batch mutation", e );
-                        }
-                    }
-                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
-        //wait or this to complete
-        final Long removedCount = uniqueValueCleanup.last();
-
-        logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
-
-        final Long versionCleanupCount = versionsDeletedObservable.last();
-
-        logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
-
-        return null;
-    }
-
-
-    private void fireEvents( final List<MvccLogEntry> versions ) {
-
-        final int listenerSize = listeners.size();
-
-        if ( listenerSize == 0 ) {
-            return;
-        }
-
-        if ( listenerSize == 1 ) {
-            listeners.iterator().next().versionDeleted( scope, entityId, versions );
-            return;
-        }
-
-        logger.debug( "Started firing {} listeners", listenerSize );
-
-        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-
-
-        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
-        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
-            listener.versionDeleted( scope, entityId, versions );
-        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
-
-        logger.debug( "Finished firing {} listeners", listenerSize );
-    }
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
deleted file mode 100644
index fbbcdbd..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires events so that all EntityVersionCreated handlers area called.
- */
-public class EntityVersionCreatedTask implements Task<Void> {
-    private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedTask.class );
-
-    private Set<EntityVersionCreated> listeners;
-    private final ApplicationScope collectionScope;
-    private final Entity entity;
-
-
-    @Inject
-    public EntityVersionCreatedTask( @Assisted final ApplicationScope collectionScope,
-                                     final Set<EntityVersionCreated> listeners,
-                                     @Assisted final Entity entity ) {
-
-        this.listeners = listeners;
-        this.collectionScope = collectionScope;
-        this.entity = entity;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-        logger.error( "Unable to run update task for collection {} with entity {} and version {}",
-                new Object[] { collectionScope, entity}, throwable );
-    }
-
-
-    @Override
-    public Void rejected() {
-
-        // Our task was rejected meaning our queue was full.
-        // We need this operation to run, so we'll run it in our current thread
-        try {
-            call();
-        }
-        catch ( Exception e ) {
-            throw new RuntimeException( "Exception thrown in call task", e );
-        }
-
-        return null;
-    }
-
-
-    @Override
-    public Void call() throws Exception {
-
-        fireEvents();
-        return null;
-    }
-
-
-    private void fireEvents() {
-
-        final int listenerSize = listeners.size();
-
-        if ( listenerSize == 0 ) {
-            return;
-        }
-
-        if ( listenerSize == 1 ) {
-            listeners.iterator().next().versionCreated( collectionScope, entity );
-            return;
-        }
-
-        logger.debug( "Started firing {} listeners", listenerSize );
-
-
-        Observable.from( listeners )
-                  .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
-                      listener.versionCreated( collectionScope, entity );
-                  } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
-        logger.debug( "Finished firing {} listeners", listenerSize );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
deleted file mode 100644
index 51a4607..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface EntityVersionTaskFactory {
-
-    /**
-     * Get a task for cleaning up latent entity data.  If includeVersion = true, the passed version will be cleaned up as well
-     * Otherwise this is a V-1 operation
-     *
-     * @param scope
-     * @param entityId
-     * @param version
-     * @param includeVersion
-     * @return
-     */
-    EntityVersionCleanupTask getCleanupTask( final ApplicationScope scope, final Id entityId, final UUID version,
-                                             final boolean includeVersion );
-
-    /**
-     * Get an entityVersionCreatedTask
-     * @param scope
-     * @param entity
-     * @return
-     */
-    EntityVersionCreatedTask getCreatedTask( final ApplicationScope scope, final Entity entity );
-
-    /**
-     * Get an entity deleted task
-     * @param collectionScope
-     * @param entityId
-     * @param version
-     * @return
-     */
-    EntityDeletedTask getDeleteTask( final ApplicationScope collectionScope, final Id entityId, final UUID version );
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
new file mode 100644
index 0000000..0034f03
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Runs on an entity that as just be mark committed, and removes all unique values <= this entity
+ */
+public class UniqueCleanup
+    implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class );
+    private final Timer uniqueCleanupTimer;
+
+
+    private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+    private final Keyspace keyspace;
+
+    private final SerializationFig serializationFig;
+
+
+    @Inject
+    public UniqueCleanup( final SerializationFig serializationFig,
+                          final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                          final Keyspace keyspace, final MetricsFactory metricsFactory ) {
+
+        this.serializationFig = serializationFig;
+        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+        this.keyspace = keyspace;
+        this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup" );
+    }
+
+
+    @Override
+    public Observable<CollectionIoEvent<MvccEntity>> call(
+        final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) {
+
+        final Observable<CollectionIoEvent<MvccEntity>> outputObservable =
+            collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> {
+
+                final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
+                final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
+                final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+
+                //TODO Refactor this logic into a a class that can be invoked from anywhere
+                //iterate all unique values
+                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
+                    Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+                        @Override
+                        protected Iterator<UniqueValue> getIterator() {
+                            return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
+                        }
+                    } )
+
+                        //skip  versions > the specified version
+                        .skipWhile( uniqueValue -> {
+
+                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+
+                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+                        } )
+
+                            //buffer our buffer size, then roll them all up in a single batch mutation
+                        .buffer( serializationFig.getBufferSize() )
+
+                            //roll them up
+                        .doOnNext( uniqueValues -> {
+                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                            for ( UniqueValue value : uniqueValues ) {
+                                uniqueCleanupBatch
+                                    .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+                            }
+
+                            try {
+                                uniqueCleanupBatch.execute();
+                            }
+                            catch ( ConnectionException e ) {
+                                throw new RuntimeException( "Unable to execute batch mutation", e );
+                            }
+                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
+            } );
+
+        return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
new file mode 100644
index 0000000..0945827
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Compact all versions on the input observable by removing them from the log, and from the
+ * versions
+ */
+public class VersionCompact
+    implements Observable.Transformer<CollectionIoEvent<MvccLogEntry>, CollectionIoEvent<MvccLogEntry>> {
+
+    private static final Logger logger = LoggerFactory.getLogger( VersionCompact.class );
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+    private final SerializationFig serializationFig;
+    private final Keyspace keyspace;
+    private final Timer compactTimer;
+
+
+    @Inject
+    public VersionCompact( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                           final SerializationFig serializationFig, final Keyspace keyspace,
+                           final MetricsFactory metricsFactory,
+                           final MvccEntitySerializationStrategy mvccEntitySerializationStrategy ) {
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.serializationFig = serializationFig;
+        this.keyspace = keyspace;
+        this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy;
+        this.compactTimer = metricsFactory.getTimer( VersionCompact.class, "compact" );
+    }
+
+
+    @Override
+    public Observable<CollectionIoEvent<MvccLogEntry>> call(
+        final Observable<CollectionIoEvent<MvccLogEntry>> collectionIoEventObservable ) {
+
+
+        final Observable<CollectionIoEvent<MvccLogEntry>> entryBuffer =
+            collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap(
+                buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
+                    ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {
+
+                        final ApplicationScope scope = mvccLogEntryCollectionIoEvent.getEntityCollection();
+                        final MvccLogEntry mvccLogEntry = mvccLogEntryCollectionIoEvent.getEvent();
+                        final Id entityId = mvccLogEntry.getEntityId();
+                        final UUID version = mvccLogEntry.getVersion();
+
+                        //delete from our log
+                        mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
+
+                        //merge our entity delete in
+                        mutationBatch
+                            .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
+
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug(
+                                "Deleting log entry and version data for entity id {} and version {} in app scope {}",
+                                new Object[] { entityId, version, scope } );
+                        }
+
+
+
+
+                    } ) )
+                    //delete from the entities
+                    .doOnNext( mutationBatch -> {
+                        try {
+                            mutationBatch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to perform batch mutation" );
+                        }
+                } ).flatMap( batches -> Observable.from( buffer ) ) );
+
+
+        return ObservableTimer.time( entryBuffer, compactTimer );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
index 92669a7..5e249ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
@@ -71,6 +71,20 @@ public interface MvccLogEntrySerializationStrategy extends Migration, VersionedD
      */
     List<MvccLogEntry> load( ApplicationScope applicationScope, Id entityId, UUID version, int maxSize );
 
+
+
+    /**
+     * Load a list, from lowest to highest of the stage with versions <= version up to maxSize elements
+     *
+     * @param applicationScope The applicationScope to load the entity from
+     * @param entityId The entity id to load
+     * @param minVersion The min version to seek from.  Null is allowed
+     * @param maxSize The maximum size to return.  If you receive this size, there may be more versions to load.
+     *
+     * @return A list of entities up to max size ordered from max(UUID)=> min(UUID)
+     */
+    List<MvccLogEntry> loadReversed( ApplicationScope applicationScope, Id entityId, UUID minVersion, int maxSize );
+
     /**
      * MarkCommit the stage from the applicationScope with the given entityId and version
      *