You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:41:00 UTC

[5/5] incubator-usergrid git commit: WIP overwrite

WIP overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/70e0e75a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/70e0e75a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/70e0e75a

Branch: refs/heads/USERGRID-614
Commit: 70e0e75aae435df0a5045008625e2dc3d7ead37e
Parents: 45aed6c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 7 10:40:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 7 10:40:52 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../corepersistence/CpRelationManager.java      |   4 +-
 .../migration/AppInfoMigrationPlugin.java       |   3 +-
 .../collection/EntityCollectionManager.java     |  10 +-
 .../cache/CachedEntityCollectionManager.java    |  10 +-
 .../EntityCollectionManagerFactoryImpl.java     |  20 +-
 .../impl/EntityCollectionManagerImpl.java       |  74 ++++---
 .../mvcc/stage/delete/VersionCompact.java       |  14 +-
 .../serialization/SerializationFig.java         |  30 +--
 .../serialization/impl/LogEntryObservable.java  |  54 -----
 .../impl/MinMaxLogEntryIterator.java            |  27 ++-
 .../MvccLogEntrySerializationStrategyImpl.java  |  28 +--
 .../collection/EntityCollectionManagerIT.java   | 215 ++++++++++++-------
 .../impl/MinMaxLogEntryIteratorTest.java        |   7 +-
 ...ccLogEntrySerializationStrategyImplTest.java |  59 ++++-
 .../collection/util/LogEntryMock.java           | 103 ++++-----
 .../src/test/resources/log4j.properties         |   1 +
 .../usergrid/persistence/graph/GraphFig.java    |  26 +--
 19 files changed, 377 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 460fc11..8bcc73a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -664,7 +664,7 @@ public class CpEntityManager implements EntityManager {
             //delete it asynchronously
             indexService.queueEntityDelete( applicationScope, entityId );
 
-            return ecm.delete( entityId );
+            return ecm.mark( entityId );
         }
         else {
             return Observable.empty();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 6c375ef..e7ad682 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -369,7 +369,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
         final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
         final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
-        final Observable deleteNodeGraph = managementGraphManager.deleteNode(applicationId, Long.MAX_VALUE);
+        final Observable deleteNodeGraph = managementGraphManager.markNode( applicationId, Long.MAX_VALUE );
         final Observable deleteAppFromIndex = aei.deleteApplication();
 
         return Observable.concat(copyConnections, deleteNodeGraph, deleteAppFromIndex)

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6adeefc..df3fa82 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
 
         //run our delete
         final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() );
-        gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
+        gm.markEdge( collectionToItemEdge ).toBlocking().last();
 
 
         /**
@@ -782,7 +782,7 @@ public class CpRelationManager implements RelationManager {
 
         //delete all the edges
         final Edge lastEdge =
-            gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking()
+            gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking()
               .lastOrDefault( null );
 
         if ( lastEdge != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
index 30955af..97b87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
-import rx.functions.Func1;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -200,7 +199,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
         final ApplicationScope systemAppScope = getApplicationScope(CpNamingUtils.SYSTEM_APP_ID );
         final EntityCollectionManager systemCollectionManager =
             entityCollectionManagerFactory.createCollectionManager( systemAppScope );
-        systemCollectionManager.delete(new SimpleId(uuid, "appinfos")).toBlocking().last();
+        systemCollectionManager.mark( new SimpleId( uuid, "appinfos" ) ).toBlocking().last();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 8c27825..9de8f41 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -46,11 +46,12 @@ public interface EntityCollectionManager {
 
 
     /**
-     * @param entityId MarkCommit the entity as deleted
+     * @param entityId MarkCommit the entity as deleted.  Will not actually remove it from cassandra.  This operation will
+     * also remove all unique properties for this entity
      *
      * @return The observable of the id after the operation has completed
      */
-    Observable<Id> delete( Id entityId );
+    Observable<Id> mark( Id entityId );
 
     /**
      * @param entityId The entity id to load.
@@ -104,11 +105,12 @@ public interface EntityCollectionManager {
     Observable<MvccLogEntry> getVersions(final Id entityId);
 
     /**
-     * Remove these versions.  Must be atomic so that read log entries are removed
+     * Delete these versions from cassandra.  Must be atomic so that read log entries are only removed.  Entity data
+     * and log entry will be deleted
      * @param entries
      * @return Any observable of all successfully compacted log entries
      */
-    Observable<MvccLogEntry> compact(final Collection<MvccLogEntry> entries);
+    Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries );
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index 9412516..7a04b8d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -85,8 +85,8 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
 
 
     @Override
-    public Observable<Id> delete( final Id entityId ) {
-        return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() {
+    public Observable<Id> mark( final Id entityId ) {
+        return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() {
             @Override
             public void call( final Id id ) {
                 entityCache.invalidate( id );
@@ -129,13 +129,13 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
 
     @Override
     public Observable<MvccLogEntry> getVersions( final Id entityId ) {
-        return null;
+        return targetEntityCollectionManager.getVersions( entityId );
     }
 
 
     @Override
-    public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
-        return targetEntityCollectionManager.compact( entries );
+    public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+        return targetEntityCollectionManager.delete( entries );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index c4422f7..50a4bfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -29,6 +29,8 @@ import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionMa
 import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -36,6 +38,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -65,6 +68,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final RollbackAction rollback;
     private final MarkStart markStart;
     private final MarkCommit markCommit;
+    private final UniqueCleanup uniqueCleanup;
+    private final VersionCompact versionCompact;
+    private final SerializationFig serializationFig;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
@@ -80,10 +86,11 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                   //create the target EM that will perform logic
                             final EntityCollectionManager target = new EntityCollectionManagerImpl(
                                 writeStart, writeVerifyUnique,
-                                writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
+                                writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,  uniqueCleanup, versionCompact,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace, scope, metricsFactory,
-                                rxTaskScheduler );
+                                mvccLogEntrySerializationStrategy, keyspace,
+                                metricsFactory, serializationFig,
+                                rxTaskScheduler, scope );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@ -98,7 +105,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final WriteOptimisticVerify writeOptimisticVerify,
                                                final WriteCommit writeCommit, final RollbackAction rollback,
                                                final MarkStart markStart, final MarkCommit markCommit,
-                                               final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                               final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
+                                               final SerializationFig serializationFig, final
+                                                   MvccEntitySerializationStrategy entitySerializationStrategy,
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace, final EntityCacheFig entityCacheFig,
@@ -111,6 +120,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.rollback = rollback;
         this.markStart = markStart;
         this.markCommit = markCommit;
+        this.uniqueCleanup = uniqueCleanup;
+        this.versionCompact = versionCompact;
+        this.serializationFig = serializationFig;
         this.entitySerializationStrategy = entitySerializationStrategy;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 83c2035..7a32d72 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.collection.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -37,6 +38,8 @@ import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -44,12 +47,15 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator;
 import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.Health;
@@ -91,6 +97,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final WriteOptimisticVerify writeOptimisticVerify;
     private final WriteCommit writeCommit;
     private final RollbackAction rollback;
+    private final UniqueCleanup uniqueCleanup;
+    private final VersionCompact versionCompact;
 
 
     //delete stages
@@ -101,8 +109,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
 
+    private final SerializationFig serializationFig;
 
-    private final RxTaskScheduler rxTaskScheduler;
 
     private final Keyspace keyspace;
     private final Timer writeTimer;
@@ -114,7 +122,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final Timer getLatestTimer;
 
     private final ApplicationScope applicationScope;
-
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     @Inject
@@ -122,15 +130,18 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                                         final WriteOptimisticVerify writeOptimisticVerify,
                                         final WriteCommit writeCommit, final RollbackAction rollback,
                                         final MarkStart markStart, final MarkCommit markCommit,
+                                        final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
                                         final MvccEntitySerializationStrategy entitySerializationStrategy,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
-                                        final Keyspace keyspace, @Assisted final ApplicationScope applicationScope,
-                                        final MetricsFactory metricsFactory,
-
-                                        final RxTaskScheduler rxTaskScheduler ) {
+                                        final Keyspace keyspace, final MetricsFactory metricsFactory,
+                                        final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler,
+                                        @Assisted final ApplicationScope applicationScope ) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.uniqueCleanup = uniqueCleanup;
+        this.versionCompact = versionCompact;
+        this.serializationFig = serializationFig;
         this.rxTaskScheduler = rxTaskScheduler;
 
         ValidationUtils.validateApplicationScope( applicationScope );
@@ -184,14 +195,15 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
     @Override
-    public Observable<Id> delete( final Id entityId ) {
+    public Observable<Id> mark( final Id entityId ) {
 
         Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
-        Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ).map( markStart )
-                                     .doOnNext( markCommit ).map( entityEvent -> entityEvent.getEvent().getId() );
+        Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart )
+                                     .doOnNext( markCommit ).compose( uniqueCleanup ).map(
+                entityEvent -> entityEvent.getEvent().getId() );
 
 
         return ObservableTimer.time( o, deleteTimer );
@@ -205,7 +217,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
 
-        final Observable<Entity> entityObservable=  load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
+        final Observable<Entity> entityObservable = load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
             final MvccEntity entity = entitySet.getEntity( entityId );
 
             if ( entity == null || !entity.getEntity().isPresent() ) {
@@ -225,7 +237,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
 
-        final Observable<EntitySet> entitySetObservable =  Observable.create( new Observable.OnSubscribe<EntitySet>() {
+        final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() {
 
             @Override
             public void call( final Subscriber<? super EntitySet> subscriber ) {
@@ -249,21 +261,32 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
     @Override
     public Observable<MvccLogEntry> getVersions( final Id entityId ) {
-//        mvccLogEntrySerializationStrategy.load(  )
-        return null;
+        ValidationUtils.verifyIdentity( entityId );
+
+        return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
+            @Override
+            protected Iterator<MvccLogEntry> getIterator() {
+                return new MinMaxLogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId,
+                    serializationFig.getBufferSize() );
+            }
+        } );
     }
 
 
     @Override
-    public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
-        return null;
+    public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+        Preconditions.checkNotNull( entries, "entries must not be null" );
+
+
+        return Observable.from( entries ).map( logEntry -> new CollectionIoEvent<>( applicationScope, logEntry ) )
+                         .compose( versionCompact ).map( event -> event.getEvent() );
     }
 
 
     @Override
     public Observable<Id> getIdField( final String type, final Field field ) {
         final List<Field> fields = Collections.singletonList( field );
-        final Observable<Id> idObservable =  Observable.from( fields ).map( field1 -> {
+        final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
             try {
                 final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
                 final UniqueValue value = set.getValue( field1.getName() );
@@ -315,8 +338,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 }
 
                 //Load a entity for each entityId we retrieved.
-                final EntitySet entitySet =
-                    entitySerializationStrategy.load( applicationScope, entityIds, startTime );
+                final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
 
                 //now loop through and ensure the entities are there.
                 final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
@@ -372,11 +394,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                 Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
                           .doOnNext( writeOptimisticVerify );
 
-            final Observable<CollectionIoEvent<MvccEntity>> zip =  Observable.zip( uniqueObservable, optimisticObservable,
-                ( unique, optimistic ) -> optimistic );
+            final Observable<CollectionIoEvent<MvccEntity>> zip =
+                Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
 
             return zip;
-
         } );
     }
 
@@ -384,8 +405,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     @Override
     public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
 
-        final Timer.Context timer = getLatestTimer.time();
-        return Observable.create( new Observable.OnSubscribe<VersionSet>() {
+
+        final Observable<VersionSet> observable =  Observable.create( new Observable.OnSubscribe<VersionSet>() {
 
             @Override
             public void call( final Subscriber<? super VersionSet> subscriber ) {
@@ -400,12 +421,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     subscriber.onError( e );
                 }
             }
-        } ).doOnCompleted( new Action0() {
-            @Override
-            public void call() {
-                timer.stop();
-            }
         } );
+
+        return ObservableTimer.time( observable, getLatestTimer );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
index 0945827..424ec86 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
@@ -87,13 +87,6 @@ public class VersionCompact
                         final Id entityId = mvccLogEntry.getEntityId();
                         final UUID version = mvccLogEntry.getVersion();
 
-                        //delete from our log
-                        mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
-
-                        //merge our entity delete in
-                        mutationBatch
-                            .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
-
                         if ( logger.isDebugEnabled() ) {
                             logger.debug(
                                 "Deleting log entry and version data for entity id {} and version {} in app scope {}",
@@ -101,6 +94,13 @@ public class VersionCompact
                         }
 
 
+                        //delete from our log
+                        mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
+
+                        //merge our entity delete in
+                        mutationBatch
+                            .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
+
 
 
                     } ) )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 381a24e..6591781 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -24,42 +24,18 @@ public interface SerializationFig extends GuicyFig {
     @Default("5")
     int getTimeout();
 
-    /**
-     * Number of history items to return for delete.
-     *
-     * @return Timeout in seconds.
-     */
-    @Key("collection.delete.history.size")
-    @Default("100")
-    int getHistorySize();
 
     /**
      * Number of items to buffer.
      *
-     * @return Timeout in seconds.
+     * @return Number of items to buffer in memory
      */
-    @Key("collection.buffer.size")
-    @Default("10")
+    @Key("buffer.size")
+    @Default("100")
     int getBufferSize();
 
 
     /**
-     * The size of threads to have in the task pool
-     */
-    @Key( "collection.task.pool.threadsize" )
-    @Default( "20" )
-    int getTaskPoolThreadSize();
-
-
-
-    /**
-     * The size of threads to have in the task pool
-     */
-    @Key( "collection.task.pool.queuesize" )
-    @Default( "20" )
-    int getTaskPoolQueueSize();
-
-    /**
      * The maximum amount of entities we can load in a single request
      * TODO, change this and move it into a common setting that both query and collection share
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
deleted file mode 100644
index 072e0ea..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import rx.Observable;
-import rx.Subscriber;
-
-
-/**
- * An observable that emits log entries from MIN to MAX
- */
-public class LogEntryObservable implements Observable.OnSubscribe<MvccLogEntry>{
-
-    private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
-
-
-    public LogEntryObservable( final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
-        this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
-    }
-
-
-    @Override
-    public void call( final Subscriber<? super MvccLogEntry> subscriber ) {
-
-        subscriber.onStart();
-
-        while(!subscriber.isUnsubscribed()){
-
-
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
index a8e15a7..eae8c06 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
@@ -35,20 +35,16 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
      * @param logEntrySerializationStrategy The serialization strategy to get the log entries
      * @param scope The scope of the entity
      * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from min to min starting with the version
-     * < max
      * @param pageSize The fetch size to get when querying the serialization strategy
      */
     public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                                   final ApplicationScope scope, final Id entityId, final UUID maxVersion,
-                                   final int pageSize ) {
+                                   final ApplicationScope scope, final Id entityId, final int pageSize ) {
 
         Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
 
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.scope = scope;
         this.entityId = entityId;
-        this.nextStart = maxVersion;
         this.pageSize = pageSize;
     }
 
@@ -89,19 +85,27 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
      */
     public void advance() throws ConnectionException {
 
-        final int requestedSize = pageSize + 1;
+        final int requestedSize;
+
+        if ( nextStart != null ) {
+            requestedSize = pageSize + 1;
+        }
+        else {
+            requestedSize = pageSize;
+        }
 
         //loop through even entry that's < this one and remove it
-        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+        List<MvccLogEntry> results = logEntrySerializationStrategy.loadReversed( scope, entityId, nextStart, requestedSize );
 
         //we always remove the first version if it's equal since it's returned
-        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+        if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
             results.remove( 0 );
         }
 
 
-        //we have results, set our next start
-        if ( results.size() == pageSize ) {
+
+        //we have results, set our next start.  If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >=
+        if ( results.size() >= pageSize ) {
             nextStart = results.get( results.size() - 1 ).getVersion();
         }
         //nothing left to do
@@ -109,6 +113,9 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
             nextStart = null;
         }
 
+
+
+
         elementItr = results.iterator();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 73804e4..0c0d961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -89,19 +89,16 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         final UUID colName = entry.getVersion();
         final StageStatus stageStatus = new StageStatus( stage, entry.getState() );
 
-        return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), new RowOp() {
-            @Override
-            public void doOp( final ColumnListMutation<UUID> colMutation ) {
-
-                //Write the stage with a timeout, it's set as transient
-                if ( stage.isTransient() ) {
-                    colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
-                    return;
-                }
-
-                //otherwise it's persistent, write it with no expiration
-                colMutation.putColumn( colName, stageStatus, SER, null );
+        return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), colMutation -> {
+
+            //Write the stage with a timeout, it's set as transient
+            if ( stage.isTransient() ) {
+                colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
+                return;
             }
+
+            //otherwise it's persistent, write it with no expiration
+            colMutation.putColumn( colName, stageStatus, SER, null );
         } );
     }
 
@@ -253,12 +250,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         Preconditions.checkNotNull( entityId, "entityId is required" );
         Preconditions.checkNotNull( version, "version context is required" );
 
-        return doWrite( context, entityId, version, new RowOp() {
-            @Override
-            public void doOp( final ColumnListMutation<UUID> colMutation ) {
-                colMutation.deleteColumn( version );
-            }
-        } );
+        return doWrite( context, entityId, version, colMutation -> colMutation.deleteColumn( version ) );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index baceeb4..36faf62 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -91,8 +91,7 @@ public class EntityCollectionManagerIT {
     public void write() {
 
 
-        ApplicationScope context =
-                new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
@@ -113,8 +112,7 @@ public class EntityCollectionManagerIT {
     public void writeWithUniqueValues() {
 
 
-        ApplicationScope context =
-                new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -146,8 +144,7 @@ public class EntityCollectionManagerIT {
     public void writeAndLoad() {
 
 
-        ApplicationScope context =
-                new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
@@ -174,7 +171,7 @@ public class EntityCollectionManagerIT {
     public void writeLoadDelete() {
 
 
-        ApplicationScope context =   new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -185,30 +182,27 @@ public class EntityCollectionManagerIT {
 
 
         assertNotNull( "Id was assigned", createReturned.getId() );
-
-        UUID version = createReturned.getVersion();
-
         Observable<Entity> loadObservable = manager.load( createReturned.getId() );
 
         Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
         assertEquals( "Same value", createReturned, loadReturned );
 
-        manager.delete( createReturned.getId() ).toBlocking().last();
+        manager.mark( createReturned.getId() ).toBlocking().last();
 
         loadObservable = manager.load( createReturned.getId() );
 
         //load may return null, use last or default
         loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
-        assertNull("Entity was deleted", loadReturned);
+        assertNull( "Entity was deleted", loadReturned );
     }
 
 
     @Test
     public void writeLoadUpdateLoad() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
         newEntity.setField( new IntegerField( "counter", 1 ) );
@@ -217,36 +211,36 @@ public class EntityCollectionManagerIT {
 
         Observable<Entity> observable = manager.write( newEntity );
 
-        Entity createReturned = observable.toBlocking().lastOrDefault(null);
+        Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
 
         assertNotNull( "Id was assigned", createReturned.getId() );
 
-        Observable<Entity> loadObservable = manager.load(createReturned.getId());
+        Observable<Entity> loadObservable = manager.load( createReturned.getId() );
 
         Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
         assertEquals( "Same value", createReturned, loadReturned );
 
 
-        assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+        assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
 
 
         //update the field to 2
         createReturned.setField( new IntegerField( "counter", 2 ) );
 
         //wait for the write to complete
-        manager.write( createReturned ).toBlocking().lastOrDefault(null);
+        manager.write( createReturned ).toBlocking().lastOrDefault( null );
 
 
         loadObservable = manager.load( createReturned.getId() );
 
-        loadReturned = loadObservable.toBlocking().lastOrDefault(null);
+        loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
         assertEquals( "Same value", createReturned, loadReturned );
 
 
-        assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+        assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
     }
 
 
@@ -254,30 +248,30 @@ public class EntityCollectionManagerIT {
     public void writeAndLoadScopeClosure() {
 
 
-        ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
-        EntityCollectionManager manager = factory.createCollectionManager(collectionScope1);
+        EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
 
-        Observable<Entity> observable = manager.write(newEntity);
+        Observable<Entity> observable = manager.write( newEntity );
 
         Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
 
-        assertNotNull("Id was assigned", createReturned.getId());
-        assertNotNull("Version was assigned", createReturned.getVersion());
+        assertNotNull( "Id was assigned", createReturned.getId() );
+        assertNotNull( "Version was assigned", createReturned.getVersion() );
 
 
         Observable<Entity> loadObservable = manager.load( createReturned.getId() );
 
         Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
-        assertEquals("Same value", createReturned, loadReturned);
+        assertEquals( "Same value", createReturned, loadReturned );
 
 
-        ApplicationScope collectionScope2 = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope collectionScope2 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         //now make sure we can't load it from another scope, using the same org
 
@@ -286,9 +280,7 @@ public class EntityCollectionManagerIT {
 
         Entity loaded = manager2.load( createReturned.getId() ).toBlocking().lastOrDefault( null );
 
-        assertNull("CollectionScope works correctly", loaded);
-
-
+        assertNull( "CollectionScope works correctly", loaded );
     }
 
 
@@ -296,34 +288,32 @@ public class EntityCollectionManagerIT {
     public void writeAndGetField() {
 
 
-        ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
         Field field = new StringField( "testField", "unique", true );
-        newEntity.setField(field);
+        newEntity.setField( field );
 
         EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
 
-        Observable<Entity> observable = manager.write(newEntity);
+        Observable<Entity> observable = manager.write( newEntity );
 
         Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
 
         assertNotNull( "Id was assigned", createReturned.getId() );
-        assertNotNull("Version was assigned", createReturned.getVersion());
+        assertNotNull( "Version was assigned", createReturned.getVersion() );
 
         Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
         assertNotNull( id );
         assertEquals( newEntity.getId(), id );
 
         Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
-        id = manager.getIdField(  newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+        id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
         assertNull( id );
     }
 
 
-
-
     @Test
     public void updateVersioning() {
 
@@ -331,10 +321,10 @@ public class EntityCollectionManagerIT {
         Entity origEntity = new Entity( new SimpleId( "testUpdate" ) );
         origEntity.setField( new StringField( "testField", "value" ) );
 
-        ApplicationScope context = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
-        Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault(null);
+        Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault( null );
 
         // note its version
         UUID oldVersion = returned.getVersion();
@@ -345,7 +335,7 @@ public class EntityCollectionManagerIT {
         // partial update entity but we don't have version number
         Entity updateEntity = new Entity( origEntity.getId() );
         updateEntity.setField( new StringField( "addedField", "other value" ) );
-        manager.write( updateEntity ).toBlocking().lastOrDefault(null);
+        manager.write( updateEntity ).toBlocking().lastOrDefault( null );
 
         // get entity now, it must have a new version
         returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null );
@@ -354,14 +344,14 @@ public class EntityCollectionManagerIT {
         assertNotNull( "A new version must be assigned", newVersion );
 
         // new Version should be > old version
-        assertTrue(UUIDComparator.staticCompare(newVersion, oldVersion) > 0);
+        assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
     }
 
 
     @Test
     public void writeMultiget() {
 
-        final ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         final int multigetSize = serializationFig.getMaxLoadSize();
@@ -381,10 +371,10 @@ public class EntityCollectionManagerIT {
 
         final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
 
-        assertNotNull(entitySet);
+        assertNotNull( entitySet );
 
         assertEquals( multigetSize, entitySet.size() );
-        assertFalse(entitySet.isEmpty());
+        assertFalse( entitySet.isEmpty() );
 
         /**
          * Validate every element exists
@@ -405,7 +395,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void writeMultigetRepair() {
 
-        final ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         final int multigetSize = serializationFig.getMaxLoadSize();
@@ -444,7 +434,7 @@ public class EntityCollectionManagerIT {
 
             assertEquals( "Same entity returned", expected, returned.getEntity().get() );
 
-            assertTrue((Boolean) returned.getEntity().get().getField("updated").getValue());
+            assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() );
         }
     }
 
@@ -452,7 +442,7 @@ public class EntityCollectionManagerIT {
     @Test( expected = IllegalArgumentException.class )
     public void readTooLarge() {
 
-        final ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         final int multigetSize = serializationFig.getMaxLoadSize() + 1;
@@ -474,7 +464,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void testGetVersion() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -495,7 +485,7 @@ public class EntityCollectionManagerIT {
 
 
         VersionSet results =
-                manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
+            manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
 
 
         final MvccLogEntry version1Log = results.getMaxVersion( created1.getId() );
@@ -515,7 +505,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void testVersionLogWrite() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -529,10 +519,10 @@ public class EntityCollectionManagerIT {
 
         final UUID v1Version = v1Created.getVersion();
 
-        final VersionSet resultsV1 = manager.getLatestVersion(Arrays.asList(v1Created.getId())).toBlocking().last();
+        final VersionSet resultsV1 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
 
 
-        final MvccLogEntry version1Log = resultsV1.getMaxVersion(v1Created.getId());
+        final MvccLogEntry version1Log = resultsV1.getMaxVersion( v1Created.getId() );
         assertEquals( v1Created.getId(), version1Log.getEntityId() );
         assertEquals( v1Version, version1Log.getVersion() );
         assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() );
@@ -543,7 +533,7 @@ public class EntityCollectionManagerIT {
         final UUID v2Version = v2Created.getVersion();
 
 
-        assertTrue("Newer version in v2", UUIDComparator.staticCompare(v2Version, v1Version) > 0);
+        assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
 
 
         final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -560,7 +550,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void testVersionLogUpdate() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -608,7 +598,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void healthTest() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -632,7 +622,7 @@ public class EntityCollectionManagerIT {
         final Entity entity = EntityHelper.generateEntity( setSize );
 
         //now we have one massive, entity, save it and retrieve it.
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -652,20 +642,21 @@ public class EntityCollectionManagerIT {
         SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", currentMaxSize + "" );
     }
 
+
     @Test
     public void invalidNameRepair() throws ConnectionException {
 
         //write an entity with a unique field
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
         //if we add a second field we get a second entity that is the exact same. Is this expected?
-        final IntegerField expectedInteger =  new IntegerField( "count", 5, true );
-       // final StringField expectedString = new StringField( "yes", "fred", true );
+        final IntegerField expectedInteger = new IntegerField( "count", 5, true );
+        // final StringField expectedString = new StringField( "yes", "fred", true );
 
         newEntity.setField( expectedInteger );
-       // newEntity.setField( expectedString );
+        // newEntity.setField( expectedString );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -677,23 +668,26 @@ public class EntityCollectionManagerIT {
         assertNotNull( "Id was assigned", createReturned.getId() );
         assertNotNull( "Version was assigned", createReturned.getVersion() );
 
-        FieldSet
-            fieldResults = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+        FieldSet fieldResults =
+            manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+                   .toBlocking().last();
 
-        assertEquals(1,fieldResults.size());
+        assertEquals( 1, fieldResults.size() );
 
 
         //verify the entity is correct.
-        assertEquals( "Same value", createReturned, fieldResults.getEntity( expectedInteger ).getEntity().get()); //loadReturned );
+        assertEquals( "Same value", createReturned,
+            fieldResults.getEntity( expectedInteger ).getEntity().get() ); //loadReturned );
 
         //use the entity serializationStrategy to remove the entity data.
 
         //do a mark as one test, and a delete as another
-        entitySerializationStrategy.delete( context,createReturned.getId(),createReturned.getVersion() ).execute();
+        entitySerializationStrategy.delete( context, createReturned.getId(), createReturned.getVersion() ).execute();
 
         //try to load via the unique field, should have triggered repair
-        final FieldSet
-            results = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+        final FieldSet results =
+            manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+                   .toBlocking().last();
 
 
         //verify no entity returned
@@ -701,37 +695,104 @@ public class EntityCollectionManagerIT {
 
         //user the unique serialization to verify it's been deleted from cassandra
 
-        UniqueValueSet uniqueValues = uniqueValueSerializationStrategy.load( context,  newEntity.getId().getType(), createReturned.getFields() );
+        UniqueValueSet uniqueValues =
+            uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() );
         assertFalse( uniqueValues.iterator().hasNext() );
-
     }
 
 
     @Test
     public void testGetIdField() throws Exception {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         EntityCollectionManager manager = factory.createCollectionManager( context );
 
         // create an entity of type "item" with a unique_id field value = 1
 
         Entity entity1 = new Entity( new SimpleId( "item" ) );
-        entity1.setField( new StringField( "unique_id", "1", true ));
+        entity1.setField( new StringField( "unique_id", "1", true ) );
         manager.write( entity1 ).toBlocking().last();
 
-        final Observable<Id> idObs = manager.getIdField("item", new StringField("unique_id", "1"));
-        Id id = idObs.toBlocking().lastOrDefault(null);
-        assertEquals(entity1.getId(), id);
+        final Observable<Id> idObs = manager.getIdField( "item", new StringField( "unique_id", "1" ) );
+        Id id = idObs.toBlocking().lastOrDefault( null );
+        assertEquals( entity1.getId(), id );
 
         // create an entity of type "deleted_item" with a unique_id field value = 1
 
         Entity entity2 = new Entity( new SimpleId( "deleted_item" ) );
-        entity2.setField( new StringField( "unique_id", "1", true ));
+        entity2.setField( new StringField( "unique_id", "1", true ) );
         manager = factory.createCollectionManager( context );
         manager.write( entity2 ).toBlocking().last();
 
-        final Observable<Id> id2Obs = manager.getIdField("deleted_item", new StringField("unique_id", "1"));
-        Id id2 = id2Obs.toBlocking().lastOrDefault(null);
-        assertEquals(entity2.getId(), id2);
+        final Observable<Id> id2Obs = manager.getIdField( "deleted_item", new StringField( "unique_id", "1" ) );
+        Id id2 = id2Obs.toBlocking().lastOrDefault( null );
+        assertEquals( entity2.getId(), id2 );
+    }
+
+
+    @Test
+    public void writeGetVersionsDelete() {
+
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+        Entity entity = new Entity( new SimpleId( "test" ) );
+        entity.setField( new IntegerField( "counter", 0 ) );
+
+        EntityCollectionManager manager = factory.createCollectionManager( context );
+
+        Entity createReturned = manager.write( entity ).toBlocking().lastOrDefault( null );
+
+        assertNotNull( "Id was assigned", createReturned.getId() );
+
+        final int size = 200;
+
+        final Id entityId = createReturned.getId();
+
+        List<UUID> versions = new ArrayList<>( size );
+        versions.add( entity.getVersion() );
+
+        //write new versions
+        for ( int i = 1; i < size; i++ ) {
+            final Entity newEntity = new Entity( entityId );
+
+            final Entity returnedEntity = manager.write( newEntity ).toBlocking().last();
+
+            versions.add( returnedEntity.getVersion() );
+        }
+
+
+        //now get our values, and load the latest version
+
+        final Entity lastVersion = manager.load( entityId ).toBlocking().last();
+
+        //ensure the latest version is correct
+        assertEquals( versions.get( versions.size() - 1 ), lastVersion.getVersion() );
+
+
+        // now ensure all versions are correct
+        final List<MvccLogEntry> entries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+
+        assertEquals( "Same size expected", versions.size(), entries.size() );
+
+        for ( int i = 0; i < versions.size(); i++ ) {
+            assertEquals( versions.get( i ), entries.get( i ).getVersion() );
+        }
+
+
+        //now get all the log versions, and delete them all we do it in 2+ batches to ensure we clean up as expected
+        manager.getVersions( entityId ).buffer( 100 ).flatMap( bufferList -> manager.delete( bufferList ) )
+               .toBlocking().last();
+
+
+        //now load them, there shouldn't be any versions
+        final List<MvccLogEntry> postDeleteEntries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+        assertEquals( "All log entries should be removed", 0, postDeleteEntries.size() );
+
+        final Entity postDeleteLastVersion = manager.load( entityId ).toBlocking().lastOrDefault( null );
+
+        //ensure the latest version is correct
+        assertNull( "Last version was deleted", postDeleteLastVersion );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
index c82e1bf..a7210a5 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
@@ -55,7 +55,7 @@ public class MinMaxLogEntryIteratorTest {
 
         //now iterate we should get everything
         MinMaxLogEntryIterator
-            itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+            itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, pageSize );
 
 
         assertFalse( itr.hasNext() );
@@ -111,12 +111,9 @@ public class MinMaxLogEntryIteratorTest {
 
         Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
 
-        //this element should be skipped
-        UUID start = expectedEntries.next().getVersion();
-
         //now iterate we should get everything
         MinMaxLogEntryIterator
-            itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+            itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, pageSize );
 
 
         while ( expectedEntries.hasNext() && itr.hasNext() ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
index 673903c..9ed254c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.collection.serialization.impl;
 
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -223,9 +224,9 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
 
         //now do a range scan from the end
 
-        final int half = count/2;
+        final int half = count / 2;
 
-        final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half);
+        final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half );
 
         assertEquals( half, results.size() );
 
@@ -238,10 +239,9 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
 
 
         //now get the next batch
-        final List<MvccLogEntry> results2 =
-            logEntryStrategy.loadReversed( context, id, versions[half], count );
+        final List<MvccLogEntry> results2 = logEntryStrategy.loadReversed( context, id, versions[half], count );
 
-        assertEquals( half, results2.size());
+        assertEquals( half, results2.size() );
 
         for ( int i = 0; i < half; i++ ) {
             final MvccLogEntry saved = entries[half + i];
@@ -256,12 +256,59 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
             logEntryStrategy.delete( context, id, versions[i] ).execute();
         }
 
-        final  List<MvccLogEntry>  results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
+        final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
 
         assertEquals( "All log entries were deleted", 0, results3.size() );
     }
 
 
+    @Test
+    public void createAndDeleteEntries() throws ConnectionException {
+
+        final Id applicationId = new SimpleId( "application" );
+
+        ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+        final Id id = new SimpleId( "test" );
+
+
+        final int size = 10;
+
+        final List<MvccLogEntry> savedEntries = new ArrayList<>( size );
+
+        for ( int i = 0; i < size; i++ ) {
+            final UUID version = UUIDGenerator.newTimeUUID();
+            MvccLogEntry saved = new MvccLogEntryImpl( id, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+            logEntryStrategy.write( context, saved ).execute();
+
+            savedEntries.add( saved );
+        }
+
+        //now test we get them all back
+
+        final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, null, size );
+
+        assertEquals( size, results.size() );
+
+        //assert they're the same
+        for ( int i = 0; i < size; i++ ) {
+            assertEquals( savedEntries.get( i ), results.get( i ) );
+        }
+
+        //now delete them all
+
+        for ( final MvccLogEntry mvccLogEntry : savedEntries ) {
+            logEntryStrategy.delete( context, id, mvccLogEntry.getVersion() ).execute();
+        }
+
+        //now get them back, should be empty
+        final List<MvccLogEntry> emptyResults = logEntryStrategy.loadReversed( context, id, null, size );
+
+        assertEquals( 0, emptyResults.size() );
+    }
+
+
     @Test( expected = NullPointerException.class )
     public void writeParamsNoContext() throws ConnectionException {
         logEntryStrategy.write( null, mock( MvccLogEntry.class ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
index 93de9d4..cd6ad3d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -20,15 +20,11 @@ package org.apache.usergrid.persistence.collection.util;/*
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.UUID;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
@@ -50,7 +46,11 @@ import static org.mockito.Mockito.when;
 public class LogEntryMock {
 
 
-    private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+    private final TreeMap<UUID, MvccLogEntry> reversedEntries =
+        new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) * -1 );
+
+    private final TreeMap<UUID, MvccLogEntry> entries =
+        new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) );
 
     private final Id entityId;
 
@@ -61,78 +61,92 @@ public class LogEntryMock {
      * @param entityId The entity Id to use
      * @param versions The versions to use
      */
-    private LogEntryMock(final Id entityId, final List<UUID> versions ) {
+    private LogEntryMock( final Id entityId, final List<UUID> versions ) {
 
         this.entityId = entityId;
 
-        for ( UUID version: versions) {
-            entries.put( version, new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) );
+        for ( UUID version : versions ) {
+            final MvccLogEntry mvccLogEntry =
+                new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE );
+            reversedEntries.put( version, mvccLogEntry );
+            entries.put( version, mvccLogEntry );
         }
     }
 
 
     /**
      * Init the mock with the given data structure
+     *
      * @param logEntrySerializationStrategy The strategy to moc
-     * @param scope
-     * @throws ConnectionException
      */
-    private void initMock(  final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope )
+    private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                           final ApplicationScope scope )
 
-            throws ConnectionException {
+        throws ConnectionException {
 
         //wire up the mocks
-        when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class)  )).thenAnswer( new Answer<List<MvccLogEntry>>() {
-
+        when( logEntrySerializationStrategy
+            .load( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
 
-            @Override
-            public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable {
+            invocation -> {
                 final UUID startVersion = ( UUID ) invocation.getArguments()[2];
-                final int count = (Integer)invocation.getArguments()[3];
+                final int count = ( Integer ) invocation.getArguments()[3];
 
                 final List<MvccLogEntry> results = new ArrayList<>( count );
 
-                final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator();
+                final Iterator<MvccLogEntry> itr = reversedEntries.tailMap( startVersion, true ).values().iterator();
 
-                for(int i = 0; i < count && itr.hasNext(); i ++){
+                for ( int i = 0; i < count && itr.hasNext(); i++ ) {
                     results.add( itr.next() );
                 }
 
 
                 return results;
-            }
-        } );
-    }
+            } );
 
 
-    /**
-     * Get the entry at the specified index from high to low
-     * @param index
-     * @return
-     */
-    public MvccLogEntry getEntryAtIndex(final int index){
+        //mock in reverse
 
-        final Iterator<MvccLogEntry> itr = entries.values().iterator();
+        when( logEntrySerializationStrategy
+            .loadReversed( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
 
-        for(int i = 0; i < index; i ++){
-           itr.next();
-        }
+            invocation -> {
+                final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+                final int count = ( Integer ) invocation.getArguments()[3];
+
+
+                final List<MvccLogEntry> results = new ArrayList<>( count );
+
+                final Iterator<MvccLogEntry> itr;
+
+                if ( startVersion == null ) {
+                    itr = entries.values().iterator();
+                }
+                else {
+                    itr = entries.tailMap( startVersion, true ).values().iterator();
+                }
+
+                for ( int i = 0; i < count && itr.hasNext(); i++ ) {
+                    results.add( itr.next() );
+                }
 
-        return itr.next();
+
+                return results;
+            } );
     }
 
 
     /**
-     *
      * @param logEntrySerializationStrategy The mock to use
      * @param scope The scope to use
      * @param entityId The entityId to use
      * @param versions The versions to mock
-     * @throws ConnectionException
      */
-    public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final  ApplicationScope scope,final Id entityId, final List<UUID> versions )
+    public static LogEntryMock createLogEntryMock(
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,
+        final Id entityId, final List<UUID> versions )
 
-            throws ConnectionException {
+        throws ConnectionException {
 
         LogEntryMock mock = new LogEntryMock( entityId, versions );
         mock.initMock( logEntrySerializationStrategy, scope );
@@ -141,19 +155,12 @@ public class LogEntryMock {
     }
 
 
-    public Collection<MvccLogEntry> getEntries() {
-        return entries.values();
+    public Collection<MvccLogEntry> getReversedEntries() {
+        return reversedEntries.values();
     }
 
 
-    private static final class ReversedUUIDComparator implements Comparator<UUID> {
-
-        public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
-
-
-        @Override
-        public int compare( final UUID o1, final UUID o2 ) {
-            return UUIDComparator.staticCompare( o1, o2 ) * -1;
-        }
+    public Collection<MvccLogEntry> getEntries() {
+        return entries.values();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties
index acf5c39..7b55cf8 100644
--- a/stack/corepersistence/collection/src/test/resources/log4j.properties
+++ b/stack/corepersistence/collection/src/test/resources/log4j.properties
@@ -33,4 +33,5 @@ log4j.logger.cassandra.db=ERROR
 
 #log4j.logger.org.apache.usergrid=DEBUG
 #log4j.logger.org.apache.usergrid.persistence.collection=TRACE
+log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 894e74a..2a153e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -31,46 +31,46 @@ import org.safehaus.guicyfig.Key;
 public interface GraphFig extends GuicyFig {
 
 
-    public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+    String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
 
-    public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+    String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
 
     /**
      * The size of the shards.  This is approximate, and should be set lower than what you would like your max to be
      */
-    public static final String SHARD_SIZE = "usergrid.graph.shard.size";
+    String SHARD_SIZE = "usergrid.graph.shard.size";
 
 
     /**
      * Number of shards we can cache.
      */
-    public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
+    String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
 
 
     /**
      * Get the cache timeout.  The local cache will exist for this amount of time max (in millis).
      */
-    public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
+    String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
     /**
      * Number of worker threads to refresh the cache
      */
-    public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+    String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
 
 
     /**
      * The size of the worker count for shard auditing
      */
-    public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
+    String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
 
 
     /**
      * The size of the worker count for shard auditing
      */
-    public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+    String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
 
 
-    public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+    String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
 
 
     /**
@@ -80,14 +80,14 @@ public interface GraphFig extends GuicyFig {
      * Note that you should also pad this for node clock drift.  A good value for this would be 2x the shard cache
      * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
      */
-    public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
+    String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
 
 
-    public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
+    String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
 
-    public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
+    String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
 
-    public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+    String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";