You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/11 19:37:18 UTC

[04/14] incubator-usergrid git commit: First pass at refactoring mark + sweep

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
new file mode 100644
index 0000000..0034f03
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Runs on an entity that as just be mark committed, and removes all unique values <= this entity
+ */
+public class UniqueCleanup
+    implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class );
+    private final Timer uniqueCleanupTimer;
+
+
+    private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+    private final Keyspace keyspace;
+
+    private final SerializationFig serializationFig;
+
+
+    @Inject
+    public UniqueCleanup( final SerializationFig serializationFig,
+                          final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                          final Keyspace keyspace, final MetricsFactory metricsFactory ) {
+
+        this.serializationFig = serializationFig;
+        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+        this.keyspace = keyspace;
+        this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup" );
+    }
+
+
+    @Override
+    public Observable<CollectionIoEvent<MvccEntity>> call(
+        final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) {
+
+        final Observable<CollectionIoEvent<MvccEntity>> outputObservable =
+            collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> {
+
+                final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
+                final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
+                final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+
+                //TODO Refactor this logic into a a class that can be invoked from anywhere
+                //iterate all unique values
+                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
+                    Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+                        @Override
+                        protected Iterator<UniqueValue> getIterator() {
+                            return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
+                        }
+                    } )
+
+                        //skip  versions > the specified version
+                        .skipWhile( uniqueValue -> {
+
+                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+
+                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+                        } )
+
+                            //buffer our buffer size, then roll them all up in a single batch mutation
+                        .buffer( serializationFig.getBufferSize() )
+
+                            //roll them up
+                        .doOnNext( uniqueValues -> {
+                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                            for ( UniqueValue value : uniqueValues ) {
+                                uniqueCleanupBatch
+                                    .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+                            }
+
+                            try {
+                                uniqueCleanupBatch.execute();
+                            }
+                            catch ( ConnectionException e ) {
+                                throw new RuntimeException( "Unable to execute batch mutation", e );
+                            }
+                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
+            } );
+
+        return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
new file mode 100644
index 0000000..424ec86
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Compact all versions on the input observable by removing them from the log, and from the
+ * versions
+ */
+public class VersionCompact
+    implements Observable.Transformer<CollectionIoEvent<MvccLogEntry>, CollectionIoEvent<MvccLogEntry>> {
+
+    private static final Logger logger = LoggerFactory.getLogger( VersionCompact.class );
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+    private final SerializationFig serializationFig;
+    private final Keyspace keyspace;
+    private final Timer compactTimer;
+
+
+    @Inject
+    public VersionCompact( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                           final SerializationFig serializationFig, final Keyspace keyspace,
+                           final MetricsFactory metricsFactory,
+                           final MvccEntitySerializationStrategy mvccEntitySerializationStrategy ) {
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.serializationFig = serializationFig;
+        this.keyspace = keyspace;
+        this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy;
+        this.compactTimer = metricsFactory.getTimer( VersionCompact.class, "compact" );
+    }
+
+
+    @Override
+    public Observable<CollectionIoEvent<MvccLogEntry>> call(
+        final Observable<CollectionIoEvent<MvccLogEntry>> collectionIoEventObservable ) {
+
+
+        final Observable<CollectionIoEvent<MvccLogEntry>> entryBuffer =
+            collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap(
+                buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
+                    ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {
+
+                        final ApplicationScope scope = mvccLogEntryCollectionIoEvent.getEntityCollection();
+                        final MvccLogEntry mvccLogEntry = mvccLogEntryCollectionIoEvent.getEvent();
+                        final Id entityId = mvccLogEntry.getEntityId();
+                        final UUID version = mvccLogEntry.getVersion();
+
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug(
+                                "Deleting log entry and version data for entity id {} and version {} in app scope {}",
+                                new Object[] { entityId, version, scope } );
+                        }
+
+
+                        //delete from our log
+                        mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
+
+                        //merge our entity delete in
+                        mutationBatch
+                            .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
+
+
+
+                    } ) )
+                    //delete from the entities
+                    .doOnNext( mutationBatch -> {
+                        try {
+                            mutationBatch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to perform batch mutation" );
+                        }
+                } ).flatMap( batches -> Observable.from( buffer ) ) );
+
+
+        return ObservableTimer.time( entryBuffer, compactTimer );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
index 92669a7..5e249ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
@@ -71,6 +71,20 @@ public interface MvccLogEntrySerializationStrategy extends Migration, VersionedD
      */
     List<MvccLogEntry> load( ApplicationScope applicationScope, Id entityId, UUID version, int maxSize );
 
+
+
+    /**
+     * Load a list, from lowest to highest of the stage with versions <= version up to maxSize elements
+     *
+     * @param applicationScope The applicationScope to load the entity from
+     * @param entityId The entity id to load
+     * @param minVersion The min version to seek from.  Null is allowed
+     * @param maxSize The maximum size to return.  If you receive this size, there may be more versions to load.
+     *
+     * @return A list of entities up to max size ordered from max(UUID)=> min(UUID)
+     */
+    List<MvccLogEntry> loadReversed( ApplicationScope applicationScope, Id entityId, UUID minVersion, int maxSize );
+
     /**
      * MarkCommit the stage from the applicationScope with the given entityId and version
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 381a24e..6591781 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -24,42 +24,18 @@ public interface SerializationFig extends GuicyFig {
     @Default("5")
     int getTimeout();
 
-    /**
-     * Number of history items to return for delete.
-     *
-     * @return Timeout in seconds.
-     */
-    @Key("collection.delete.history.size")
-    @Default("100")
-    int getHistorySize();
 
     /**
      * Number of items to buffer.
      *
-     * @return Timeout in seconds.
+     * @return Number of items to buffer in memory
      */
-    @Key("collection.buffer.size")
-    @Default("10")
+    @Key("buffer.size")
+    @Default("100")
     int getBufferSize();
 
 
     /**
-     * The size of threads to have in the task pool
-     */
-    @Key( "collection.task.pool.threadsize" )
-    @Default( "20" )
-    int getTaskPoolThreadSize();
-
-
-
-    /**
-     * The size of threads to have in the task pool
-     */
-    @Key( "collection.task.pool.queuesize" )
-    @Default( "20" )
-    int getTaskPoolQueueSize();
-
-    /**
      * The maximum amount of entities we can load in a single request
      * TODO, change this and move it into a common setting that both query and collection share
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
deleted file mode 100644
index e5c2896..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- */
-public class LogEntryIterator implements Iterator<MvccLogEntry> {
-
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final ApplicationScope scope;
-    private final Id entityId;
-    private final int pageSize;
-
-
-    private Iterator<MvccLogEntry> elementItr;
-    private UUID nextStart;
-
-
-    /**
-     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
-     * @param scope The scope of the entity
-     * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version
-     * < max
-     * @param pageSize The fetch size to get when querying the serialization strategy
-     */
-    public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                             final ApplicationScope scope, final Id entityId, final UUID maxVersion,
-                             final int pageSize ) {
-
-        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
-
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.scope = scope;
-        this.entityId = entityId;
-        this.nextStart = maxVersion;
-        this.pageSize = pageSize;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
-            try {
-                advance();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to query cassandra", e );
-            }
-        }
-
-        return elementItr.hasNext();
-    }
-
-
-    @Override
-    public MvccLogEntry next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more elements exist" );
-        }
-
-        return elementItr.next();
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
-
-    /**
-     * Advance our iterator
-     */
-    public void advance() throws ConnectionException {
-
-        final int requestedSize = pageSize + 1;
-
-        //loop through even entry that's < this one and remove it
-        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
-
-        //we always remove the first version if it's equal since it's returned
-        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
-            results.remove( 0 );
-        }
-
-
-        //we have results, set our next start
-        if ( results.size() == pageSize ) {
-            nextStart = results.get( results.size() - 1 ).getVersion();
-        }
-        //nothing left to do
-        else {
-            nextStart = null;
-        }
-
-        elementItr = results.iterator();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
new file mode 100644
index 0000000..eae8c06
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
@@ -0,0 +1,121 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ */
+public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final ApplicationScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+
+
+    /**
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                   final ApplicationScope scope, final Id entityId, final int pageSize ) {
+
+        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.pageSize = pageSize;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public MvccLogEntry next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize;
+
+        if ( nextStart != null ) {
+            requestedSize = pageSize + 1;
+        }
+        else {
+            requestedSize = pageSize;
+        }
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.loadReversed( scope, entityId, nextStart, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+            results.remove( 0 );
+        }
+
+
+
+        //we have results, set our next start.  If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >=
+        if ( results.size() >= pageSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+
+
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
index 81c0248..fdef3c3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
@@ -111,6 +111,20 @@ public class MvccLogEntrySerializationProxyImpl implements MvccLogEntrySerializa
 
 
     @Override
+    public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+                                            final UUID minVersion, final int maxSize ) {
+
+        final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
+
+        if ( migration.needsMigration() ) {
+            return migration.from.loadReversed( applicationScope, entityId, minVersion, maxSize );
+        }
+
+        return migration.to.loadReversed( applicationScope, entityId, minVersion, maxSize );
+    }
+
+
+    @Override
     public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
         final MigrationRelationship<MvccLogEntrySerializationStrategy> migration = getMigrationRelationShip();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 76e9dba..0c0d961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -31,11 +30,6 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.ReversedType;
-import org.apache.cassandra.db.marshal.UUIDType;
-
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
@@ -43,10 +37,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -98,19 +89,16 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         final UUID colName = entry.getVersion();
         final StageStatus stageStatus = new StageStatus( stage, entry.getState() );
 
-        return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), new RowOp() {
-            @Override
-            public void doOp( final ColumnListMutation<UUID> colMutation ) {
-
-                //Write the stage with a timeout, it's set as transient
-                if ( stage.isTransient() ) {
-                    colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
-                    return;
-                }
+        return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), colMutation -> {
 
-                //otherwise it's persistent, write it with no expiration
-                colMutation.putColumn( colName, stageStatus, SER, null );
+            //Write the stage with a timeout, it's set as transient
+            if ( stage.isTransient() ) {
+                colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
+                return;
             }
+
+            //otherwise it's persistent, write it with no expiration
+            colMutation.putColumn( colName, stageStatus, SER, null );
         } );
     }
 
@@ -126,12 +114,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
 
         //didnt put the max in the error message, I don't want to take the string construction hit every time
         Preconditions.checkArgument( entityIds.size() <= fig.getMaxLoadSize(),
-                "requested size cannot be over configured maximum" );
+            "requested size cannot be over configured maximum" );
 
 
         final Id applicationId = collectionScope.getApplication();
-        final Id ownerId = applicationId;
-
 
 
         final List<ScopedRowKey<K>> rowKeys = new ArrayList<>( entityIds.size() );
@@ -155,7 +141,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         }
         catch ( ConnectionException e ) {
             throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
-                    e );
+                e );
         }
 
 
@@ -181,7 +167,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
             final StageStatus stageStatus = column.getValue( SER );
 
             final MvccLogEntry logEntry =
-                    new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
+                new MvccLogEntryImpl( entityId, version, stageStatus.stage, stageStatus.state );
 
 
             versionResults.addEntry( logEntry );
@@ -208,13 +194,41 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
             final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
 
 
+            columns =
+                keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey ).withColumnRange( version, null, false, maxSize )
+                        .execute().getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to load log entries", e );
+        }
+
+        return parseResults( columns, entityId );
+    }
+
+
+    @Override
+    public List<MvccLogEntry> loadReversed( final ApplicationScope applicationScope, final Id entityId,
+                                            final UUID minVersion, final int maxSize ) {
+        ColumnList<UUID> columns;
+        try {
+
+            final Id applicationId = applicationScope.getApplication();
+
+            final ScopedRowKey<K> rowKey = createKey( applicationId, entityId );
+
+
             columns = keyspace.prepareQuery( CF_ENTITY_LOG ).getKey( rowKey )
-                              .withColumnRange( version, null, false, maxSize ).execute().getResult();
+                              .withColumnRange( minVersion, null, true, maxSize ).execute().getResult();
         }
         catch ( ConnectionException e ) {
             throw new RuntimeException( "Unable to load log entries", e );
         }
 
+        return parseResults( columns, entityId );
+    }
+
+
+    private List<MvccLogEntry> parseResults( final ColumnList<UUID> columns, final Id entityId ) {
 
         List<MvccLogEntry> results = new ArrayList<MvccLogEntry>( columns.size() );
 
@@ -236,17 +250,10 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         Preconditions.checkNotNull( entityId, "entityId is required" );
         Preconditions.checkNotNull( version, "version context is required" );
 
-        return doWrite( context, entityId, version, new RowOp() {
-            @Override
-            public void doOp( final ColumnListMutation<UUID> colMutation ) {
-                colMutation.deleteColumn( version );
-            }
-        } );
+        return doWrite( context, entityId, version, colMutation -> colMutation.deleteColumn( version ) );
     }
 
 
-
-
     /**
      * Simple callback to perform puts and deletes with a common row setup code
      */
@@ -281,12 +288,14 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
         return batch;
     }
 
+
     protected abstract MultiTennantColumnFamily<ScopedRowKey<K>, UUID> getColumnFamily();
 
 
-    protected abstract ScopedRowKey<K> createKey(final Id applicationId, final Id entityId);
+    protected abstract ScopedRowKey<K> createKey( final Id applicationId, final Id entityId );
+
+    protected abstract Id getEntityIdFromKey( final ScopedRowKey<K> key );
 
-    protected abstract Id getEntityIdFromKey(final ScopedRowKey<K> key);
 
     /**
      * Internal stage shard
@@ -319,7 +328,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
      */
     private static class StatusCache {
         private Map<Integer, MvccLogEntry.State> values =
-                new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
+            new HashMap<Integer, MvccLogEntry.State>( MvccLogEntry.State.values().length );
 
 
         private StatusCache() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 3168817..e825bbc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -31,16 +31,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
@@ -76,7 +72,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
-    private final EntityVersionTaskFactory entityVersionCleanupFactory;
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
@@ -85,13 +80,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
-                                        final EntityVersionTaskFactory entityVersionCleanupFactory,
                                         final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
                                         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
         this.keyspace = keyspace;
         this.allVersions = allVersions;
-        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
         this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
@@ -162,7 +155,8 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
                         atomicLong.addAndGet( entities.size() );
 
-                        List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
+                        final List<Id> toSaveIds = new ArrayList<>( entities.size() );
+
 
                         for ( EntityToSaveMessage message : entities ) {
                             final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
@@ -187,6 +181,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
                             final UUID version = message.entity.getVersion();
 
+
+                            toSaveIds.add( entityId );
+
                             // re-write the unique
                             // values
                             // but this
@@ -215,7 +212,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
                              * Migrate the log entry to the new format
                              */
                             for(final MvccLogEntry entry: logEntries){
-
                                 final MutationBatch mb = mvccLogEntrySerializationStrategy.write( message.scope, entry );
 
                                 totalBatch.mergeShallow( mb );
@@ -223,24 +219,18 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
 
 
-                            //schedule our cleanup task to clean up all the data
-                            final EntityVersionCleanupTask task = entityVersionCleanupFactory
-                                .getCleanupTask( message.scope, message.entity.getId(), version, false );
 
-                            entityVersionCleanupTasks.add( task );
                         }
 
                         executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
 
                         //now run our cleanup task
 
-                        for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
-                            try {
-                                entityVersionCleanupTask.call();
-                            }
-                            catch ( Exception e ) {
-                                LOGGER.error( "Unable to run cleanup task", e );
-                            }
+                        for ( Id updatedId : toSaveIds ) {
+
+
+
+
                         }
                     } ).subscribeOn( Schedulers.io() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index baceeb4..36faf62 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -91,8 +91,7 @@ public class EntityCollectionManagerIT {
     public void write() {
 
 
-        ApplicationScope context =
-                new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
@@ -113,8 +112,7 @@ public class EntityCollectionManagerIT {
     public void writeWithUniqueValues() {
 
 
-        ApplicationScope context =
-                new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -146,8 +144,7 @@ public class EntityCollectionManagerIT {
     public void writeAndLoad() {
 
 
-        ApplicationScope context =
-                new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
@@ -174,7 +171,7 @@ public class EntityCollectionManagerIT {
     public void writeLoadDelete() {
 
 
-        ApplicationScope context =   new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -185,30 +182,27 @@ public class EntityCollectionManagerIT {
 
 
         assertNotNull( "Id was assigned", createReturned.getId() );
-
-        UUID version = createReturned.getVersion();
-
         Observable<Entity> loadObservable = manager.load( createReturned.getId() );
 
         Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
         assertEquals( "Same value", createReturned, loadReturned );
 
-        manager.delete( createReturned.getId() ).toBlocking().last();
+        manager.mark( createReturned.getId() ).toBlocking().last();
 
         loadObservable = manager.load( createReturned.getId() );
 
         //load may return null, use last or default
         loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
-        assertNull("Entity was deleted", loadReturned);
+        assertNull( "Entity was deleted", loadReturned );
     }
 
 
     @Test
     public void writeLoadUpdateLoad() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
         newEntity.setField( new IntegerField( "counter", 1 ) );
@@ -217,36 +211,36 @@ public class EntityCollectionManagerIT {
 
         Observable<Entity> observable = manager.write( newEntity );
 
-        Entity createReturned = observable.toBlocking().lastOrDefault(null);
+        Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
 
         assertNotNull( "Id was assigned", createReturned.getId() );
 
-        Observable<Entity> loadObservable = manager.load(createReturned.getId());
+        Observable<Entity> loadObservable = manager.load( createReturned.getId() );
 
         Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
         assertEquals( "Same value", createReturned, loadReturned );
 
 
-        assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+        assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
 
 
         //update the field to 2
         createReturned.setField( new IntegerField( "counter", 2 ) );
 
         //wait for the write to complete
-        manager.write( createReturned ).toBlocking().lastOrDefault(null);
+        manager.write( createReturned ).toBlocking().lastOrDefault( null );
 
 
         loadObservable = manager.load( createReturned.getId() );
 
-        loadReturned = loadObservable.toBlocking().lastOrDefault(null);
+        loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
         assertEquals( "Same value", createReturned, loadReturned );
 
 
-        assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+        assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
     }
 
 
@@ -254,30 +248,30 @@ public class EntityCollectionManagerIT {
     public void writeAndLoadScopeClosure() {
 
 
-        ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
-        EntityCollectionManager manager = factory.createCollectionManager(collectionScope1);
+        EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
 
-        Observable<Entity> observable = manager.write(newEntity);
+        Observable<Entity> observable = manager.write( newEntity );
 
         Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
 
-        assertNotNull("Id was assigned", createReturned.getId());
-        assertNotNull("Version was assigned", createReturned.getVersion());
+        assertNotNull( "Id was assigned", createReturned.getId() );
+        assertNotNull( "Version was assigned", createReturned.getVersion() );
 
 
         Observable<Entity> loadObservable = manager.load( createReturned.getId() );
 
         Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
 
-        assertEquals("Same value", createReturned, loadReturned);
+        assertEquals( "Same value", createReturned, loadReturned );
 
 
-        ApplicationScope collectionScope2 = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope collectionScope2 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         //now make sure we can't load it from another scope, using the same org
 
@@ -286,9 +280,7 @@ public class EntityCollectionManagerIT {
 
         Entity loaded = manager2.load( createReturned.getId() ).toBlocking().lastOrDefault( null );
 
-        assertNull("CollectionScope works correctly", loaded);
-
-
+        assertNull( "CollectionScope works correctly", loaded );
     }
 
 
@@ -296,34 +288,32 @@ public class EntityCollectionManagerIT {
     public void writeAndGetField() {
 
 
-        ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
         Field field = new StringField( "testField", "unique", true );
-        newEntity.setField(field);
+        newEntity.setField( field );
 
         EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
 
-        Observable<Entity> observable = manager.write(newEntity);
+        Observable<Entity> observable = manager.write( newEntity );
 
         Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
 
         assertNotNull( "Id was assigned", createReturned.getId() );
-        assertNotNull("Version was assigned", createReturned.getVersion());
+        assertNotNull( "Version was assigned", createReturned.getVersion() );
 
         Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
         assertNotNull( id );
         assertEquals( newEntity.getId(), id );
 
         Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
-        id = manager.getIdField(  newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+        id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
         assertNull( id );
     }
 
 
-
-
     @Test
     public void updateVersioning() {
 
@@ -331,10 +321,10 @@ public class EntityCollectionManagerIT {
         Entity origEntity = new Entity( new SimpleId( "testUpdate" ) );
         origEntity.setField( new StringField( "testField", "value" ) );
 
-        ApplicationScope context = new ApplicationScopeImpl(new SimpleId("organization"));
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
-        Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault(null);
+        Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault( null );
 
         // note its version
         UUID oldVersion = returned.getVersion();
@@ -345,7 +335,7 @@ public class EntityCollectionManagerIT {
         // partial update entity but we don't have version number
         Entity updateEntity = new Entity( origEntity.getId() );
         updateEntity.setField( new StringField( "addedField", "other value" ) );
-        manager.write( updateEntity ).toBlocking().lastOrDefault(null);
+        manager.write( updateEntity ).toBlocking().lastOrDefault( null );
 
         // get entity now, it must have a new version
         returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null );
@@ -354,14 +344,14 @@ public class EntityCollectionManagerIT {
         assertNotNull( "A new version must be assigned", newVersion );
 
         // new Version should be > old version
-        assertTrue(UUIDComparator.staticCompare(newVersion, oldVersion) > 0);
+        assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
     }
 
 
     @Test
     public void writeMultiget() {
 
-        final ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         final int multigetSize = serializationFig.getMaxLoadSize();
@@ -381,10 +371,10 @@ public class EntityCollectionManagerIT {
 
         final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
 
-        assertNotNull(entitySet);
+        assertNotNull( entitySet );
 
         assertEquals( multigetSize, entitySet.size() );
-        assertFalse(entitySet.isEmpty());
+        assertFalse( entitySet.isEmpty() );
 
         /**
          * Validate every element exists
@@ -405,7 +395,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void writeMultigetRepair() {
 
-        final ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         final int multigetSize = serializationFig.getMaxLoadSize();
@@ -444,7 +434,7 @@ public class EntityCollectionManagerIT {
 
             assertEquals( "Same entity returned", expected, returned.getEntity().get() );
 
-            assertTrue((Boolean) returned.getEntity().get().getField("updated").getValue());
+            assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() );
         }
     }
 
@@ -452,7 +442,7 @@ public class EntityCollectionManagerIT {
     @Test( expected = IllegalArgumentException.class )
     public void readTooLarge() {
 
-        final ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
         final int multigetSize = serializationFig.getMaxLoadSize() + 1;
@@ -474,7 +464,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void testGetVersion() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -495,7 +485,7 @@ public class EntityCollectionManagerIT {
 
 
         VersionSet results =
-                manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
+            manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
 
 
         final MvccLogEntry version1Log = results.getMaxVersion( created1.getId() );
@@ -515,7 +505,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void testVersionLogWrite() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -529,10 +519,10 @@ public class EntityCollectionManagerIT {
 
         final UUID v1Version = v1Created.getVersion();
 
-        final VersionSet resultsV1 = manager.getLatestVersion(Arrays.asList(v1Created.getId())).toBlocking().last();
+        final VersionSet resultsV1 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
 
 
-        final MvccLogEntry version1Log = resultsV1.getMaxVersion(v1Created.getId());
+        final MvccLogEntry version1Log = resultsV1.getMaxVersion( v1Created.getId() );
         assertEquals( v1Created.getId(), version1Log.getEntityId() );
         assertEquals( v1Version, version1Log.getVersion() );
         assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() );
@@ -543,7 +533,7 @@ public class EntityCollectionManagerIT {
         final UUID v2Version = v2Created.getVersion();
 
 
-        assertTrue("Newer version in v2", UUIDComparator.staticCompare(v2Version, v1Version) > 0);
+        assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
 
 
         final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -560,7 +550,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void testVersionLogUpdate() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -608,7 +598,7 @@ public class EntityCollectionManagerIT {
     @Test
     public void healthTest() {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -632,7 +622,7 @@ public class EntityCollectionManagerIT {
         final Entity entity = EntityHelper.generateEntity( setSize );
 
         //now we have one massive, entity, save it and retrieve it.
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         final EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -652,20 +642,21 @@ public class EntityCollectionManagerIT {
         SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", currentMaxSize + "" );
     }
 
+
     @Test
     public void invalidNameRepair() throws ConnectionException {
 
         //write an entity with a unique field
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
         Entity newEntity = new Entity( new SimpleId( "test" ) );
 
         //if we add a second field we get a second entity that is the exact same. Is this expected?
-        final IntegerField expectedInteger =  new IntegerField( "count", 5, true );
-       // final StringField expectedString = new StringField( "yes", "fred", true );
+        final IntegerField expectedInteger = new IntegerField( "count", 5, true );
+        // final StringField expectedString = new StringField( "yes", "fred", true );
 
         newEntity.setField( expectedInteger );
-       // newEntity.setField( expectedString );
+        // newEntity.setField( expectedString );
 
         EntityCollectionManager manager = factory.createCollectionManager( context );
 
@@ -677,23 +668,26 @@ public class EntityCollectionManagerIT {
         assertNotNull( "Id was assigned", createReturned.getId() );
         assertNotNull( "Version was assigned", createReturned.getVersion() );
 
-        FieldSet
-            fieldResults = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+        FieldSet fieldResults =
+            manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+                   .toBlocking().last();
 
-        assertEquals(1,fieldResults.size());
+        assertEquals( 1, fieldResults.size() );
 
 
         //verify the entity is correct.
-        assertEquals( "Same value", createReturned, fieldResults.getEntity( expectedInteger ).getEntity().get()); //loadReturned );
+        assertEquals( "Same value", createReturned,
+            fieldResults.getEntity( expectedInteger ).getEntity().get() ); //loadReturned );
 
         //use the entity serializationStrategy to remove the entity data.
 
         //do a mark as one test, and a delete as another
-        entitySerializationStrategy.delete( context,createReturned.getId(),createReturned.getVersion() ).execute();
+        entitySerializationStrategy.delete( context, createReturned.getId(), createReturned.getVersion() ).execute();
 
         //try to load via the unique field, should have triggered repair
-        final FieldSet
-            results = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+        final FieldSet results =
+            manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+                   .toBlocking().last();
 
 
         //verify no entity returned
@@ -701,37 +695,104 @@ public class EntityCollectionManagerIT {
 
         //user the unique serialization to verify it's been deleted from cassandra
 
-        UniqueValueSet uniqueValues = uniqueValueSerializationStrategy.load( context,  newEntity.getId().getType(), createReturned.getFields() );
+        UniqueValueSet uniqueValues =
+            uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() );
         assertFalse( uniqueValues.iterator().hasNext() );
-
     }
 
 
     @Test
     public void testGetIdField() throws Exception {
 
-        ApplicationScope context =  new ApplicationScopeImpl( new SimpleId( "organization" ) );
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
         EntityCollectionManager manager = factory.createCollectionManager( context );
 
         // create an entity of type "item" with a unique_id field value = 1
 
         Entity entity1 = new Entity( new SimpleId( "item" ) );
-        entity1.setField( new StringField( "unique_id", "1", true ));
+        entity1.setField( new StringField( "unique_id", "1", true ) );
         manager.write( entity1 ).toBlocking().last();
 
-        final Observable<Id> idObs = manager.getIdField("item", new StringField("unique_id", "1"));
-        Id id = idObs.toBlocking().lastOrDefault(null);
-        assertEquals(entity1.getId(), id);
+        final Observable<Id> idObs = manager.getIdField( "item", new StringField( "unique_id", "1" ) );
+        Id id = idObs.toBlocking().lastOrDefault( null );
+        assertEquals( entity1.getId(), id );
 
         // create an entity of type "deleted_item" with a unique_id field value = 1
 
         Entity entity2 = new Entity( new SimpleId( "deleted_item" ) );
-        entity2.setField( new StringField( "unique_id", "1", true ));
+        entity2.setField( new StringField( "unique_id", "1", true ) );
         manager = factory.createCollectionManager( context );
         manager.write( entity2 ).toBlocking().last();
 
-        final Observable<Id> id2Obs = manager.getIdField("deleted_item", new StringField("unique_id", "1"));
-        Id id2 = id2Obs.toBlocking().lastOrDefault(null);
-        assertEquals(entity2.getId(), id2);
+        final Observable<Id> id2Obs = manager.getIdField( "deleted_item", new StringField( "unique_id", "1" ) );
+        Id id2 = id2Obs.toBlocking().lastOrDefault( null );
+        assertEquals( entity2.getId(), id2 );
+    }
+
+
+    @Test
+    public void writeGetVersionsDelete() {
+
+        ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+        Entity entity = new Entity( new SimpleId( "test" ) );
+        entity.setField( new IntegerField( "counter", 0 ) );
+
+        EntityCollectionManager manager = factory.createCollectionManager( context );
+
+        Entity createReturned = manager.write( entity ).toBlocking().lastOrDefault( null );
+
+        assertNotNull( "Id was assigned", createReturned.getId() );
+
+        final int size = 200;
+
+        final Id entityId = createReturned.getId();
+
+        List<UUID> versions = new ArrayList<>( size );
+        versions.add( entity.getVersion() );
+
+        //write new versions
+        for ( int i = 1; i < size; i++ ) {
+            final Entity newEntity = new Entity( entityId );
+
+            final Entity returnedEntity = manager.write( newEntity ).toBlocking().last();
+
+            versions.add( returnedEntity.getVersion() );
+        }
+
+
+        //now get our values, and load the latest version
+
+        final Entity lastVersion = manager.load( entityId ).toBlocking().last();
+
+        //ensure the latest version is correct
+        assertEquals( versions.get( versions.size() - 1 ), lastVersion.getVersion() );
+
+
+        // now ensure all versions are correct
+        final List<MvccLogEntry> entries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+
+        assertEquals( "Same size expected", versions.size(), entries.size() );
+
+        for ( int i = 0; i < versions.size(); i++ ) {
+            assertEquals( versions.get( i ), entries.get( i ).getVersion() );
+        }
+
+
+        //now get all the log versions, and delete them all we do it in 2+ batches to ensure we clean up as expected
+        manager.getVersions( entityId ).buffer( 100 ).flatMap( bufferList -> manager.delete( bufferList ) )
+               .toBlocking().last();
+
+
+        //now load them, there shouldn't be any versions
+        final List<MvccLogEntry> postDeleteEntries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+        assertEquals( "All log entries should be removed", 0, postDeleteEntries.size() );
+
+        final Entity postDeleteLastVersion = manager.load( entityId ).toBlocking().lastOrDefault( null );
+
+        //ensure the latest version is correct
+        assertNull( "Last version was deleted", postDeleteLastVersion );
     }
 }