You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/01 18:32:17 UTC

[13/15] git commit: Refactored listener to take buffers of versions for efficiency

Refactored listener to take buffers of versions for efficiency

Refactored task to use RX for observable streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cadba29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cadba29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cadba29

Branch: refs/heads/two-dot-o
Commit: 8cadba29c80766bc8fe12755ac02cca2f741f077
Parents: 1a41624
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:30:28 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:30:28 2014 -0600

----------------------------------------------------------------------
 .../collection/event/EntityVersionDeleted.java  |   5 +-
 .../impl/EntityVersionCleanupTask.java          |  85 ++++---
 .../serialization/impl/LogEntryIterator.java    |   9 +-
 .../impl/EntityVersionCleanupTaskTest.java      | 230 ++++++++++++-------
 4 files changed, 213 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
index 3b76e84..ff7d960 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -1,6 +1,7 @@
 package org.apache.usergrid.persistence.collection.event;
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -21,8 +22,8 @@ public interface EntityVersionDeleted {
      *
      * @param scope The scope of the entity
      * @param entityId The entity Id that was removed
-     * @param entityVersion The version that was removed
+     * @param entityVersions The versions that are to be removed
      */
-    public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+    public void versionDeleted(final CollectionScope scope, final Id entityId, final List<UUID> entityVersions);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index d7ece40..2d30d36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,9 +1,9 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -15,9 +15,14 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
@@ -37,6 +42,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
+    private final Keyspace keyspace;
 
     private final SerializationFig serializationFig;
 
@@ -48,12 +54,13 @@ public class EntityVersionCleanupTask implements Task<Void> {
     public EntityVersionCleanupTask( final SerializationFig serializationFig,
                                      final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                                      final MvccEntitySerializationStrategy entitySerializationStrategy,
-                                     final List<EntityVersionDeleted> listeners, final CollectionScope scope,
-                                     final Id entityId, final UUID version ) {
+                                     final Keyspace keyspace, final List<EntityVersionDeleted> listeners,
+                                     final CollectionScope scope, final Id entityId, final UUID version ) {
 
         this.serializationFig = serializationFig;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.keyspace = keyspace;
         this.listeners = listeners;
         this.scope = scope;
         this.entityId = entityId;
@@ -91,36 +98,67 @@ public class EntityVersionCleanupTask implements Task<Void> {
         final UUID maxVersion = version;
 
 
-        LogEntryIterator logEntryIterator =
-                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
-                        serializationFig.getHistorySize() );
+        Observable<MvccLogEntry> versions = Observable.create( new ObservableIterator( "versionIterators" ) {
+            @Override
+            protected Iterator getIterator() {
+                return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+                        serializationFig.getBufferSize() );
+            }
+        } );
 
 
-        //for every entry, we want to clean it up with listeners
+        //get the uuid from the version
+        versions.map( new Func1<MvccLogEntry, UUID>() {
+            @Override
+            public UUID call( final MvccLogEntry mvccLogEntry ) {
+                return mvccLogEntry.getVersion();
+            }
+        } )
+                //buffer our versions
+         .buffer( serializationFig.getBufferSize() )
+         //for each buffer set, delete all of them
+         .doOnNext( new Action1<List<UUID>>() {
+            @Override
+            public void call( final List<UUID> versions ) {
 
-        while ( logEntryIterator.hasNext() ) {
+                //Fire all the listeners
+                fireEvents( versions );
 
-            final MvccLogEntry logEntry = logEntryIterator.next();
+                MutationBatch entityBatch = keyspace.prepareMutationBatch();
+                MutationBatch logBatch = keyspace.prepareMutationBatch();
 
+                for ( UUID version : versions ) {
+                    final MutationBatch entityDelete = entitySerializationStrategy.delete( scope, entityId, version );
 
-            final UUID version = logEntry.getVersion();
+                    entityBatch.mergeShallow( entityDelete );
 
+                    final MutationBatch logDelete = logEntrySerializationStrategy.delete( scope, entityId, version );
 
-            fireEvents();
+                    logBatch.mergeShallow( logDelete );
+                }
 
-            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
-            //after every successful invocation of listeners and entity removal
-            entitySerializationStrategy.delete( scope, entityId, version ).execute();
 
-            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
-        }
+                try {
+                    entityBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to delete entities in cleanup", e );
+                }
 
+                try {
+                    logBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to delete entities from the log", e );
+                }
+            }
+        } ).count().toBlocking().last();
 
         return null;
     }
 
 
-    private void fireEvents() throws ExecutionException, InterruptedException {
+    private void fireEvents( final List<UUID> versions ) {
 
         final int listenerSize = listeners.size();
 
@@ -129,7 +167,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
         }
 
         if ( listenerSize == 1 ) {
-            listeners.get( 0 ).versionDeleted( scope, entityId, version );
+            listeners.get( 0 ).versionDeleted( scope, entityId, versions );
             return;
         }
 
@@ -146,7 +184,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                           return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
                               @Override
                               public void call( final EntityVersionDeleted listener ) {
-                                  listener.versionDeleted( scope, entityId, version );
+                                  listener.versionDeleted( scope, entityId, versions );
                               }
                           } );
                       }
@@ -154,15 +192,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
         LOG.debug( "Finished firing {} listeners", listenerSize );
     }
-
-
-    private static interface ListenerRunner {
-
-        /**
-         * Run the listeners
-         */
-        public void runListeners();
-    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
index 53eb6e3..d87f850 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -11,12 +11,12 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Preconditions;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /**
  * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- *
  */
 public class LogEntryIterator implements Iterator<MvccLogEntry> {
 
@@ -32,16 +32,19 @@ public class LogEntryIterator implements Iterator<MvccLogEntry> {
 
 
     /**
-     *
      * @param logEntrySerializationStrategy The serialization strategy to get the log entries
      * @param scope The scope of the entity
      * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version < max
+     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version
+     * < max
      * @param pageSize The fetch size to get when querying the serialization strategy
      */
     public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                              final CollectionScope scope, final Id entityId, final UUID maxVersion,
                              final int pageSize ) {
+
+        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.scope = scope;
         this.entityId = entityId;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index a34b4f8..1fce6e2 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
 import org.junit.AfterClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -43,6 +42,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -69,13 +69,13 @@ public class EntityVersionCleanupTaskTest {
     }
 
 
-    @Test
+    @Test(timeout=10000)
     public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -83,6 +83,15 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //intentionally no events
         final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -105,20 +114,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch newBatch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( newBatch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( newBatch );
 
 
         //start the task
@@ -128,22 +135,22 @@ public class EntityVersionCleanupTaskTest {
         future.get();
 
         //verify it was run
-        verify( firstBatch ).execute();
+        verify( entityBatch ).execute();
 
-        verify( secondBatch ).execute();
+        verify( logBatch ).execute();
     }
 
 
     /**
      * Tests the cleanup task on the first version ceated
      */
-    @Test
+    @Test(timeout=10000)
     public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -151,6 +158,15 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //intentionally no events
         final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -173,20 +189,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( batch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -196,19 +210,19 @@ public class EntityVersionCleanupTaskTest {
         future.get();
 
         //verify it was run
-        verify( firstBatch, never() ).execute();
+        verify( entityBatch, never() ).execute();
 
-        verify( secondBatch, never() ).execute();
+        verify( logBatch, never() ).execute();
     }
 
 
-    @Test
+    @Test(timeout=10000)
     public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -216,6 +230,15 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 1;
@@ -246,20 +269,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( batch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -270,23 +291,23 @@ public class EntityVersionCleanupTaskTest {
 
         //we deleted the version
         //verify it was run
-        verify( firstBatch ).execute();
+        verify( entityBatch ).execute();
 
-        verify( secondBatch ).execute();
+        verify( logBatch ).execute();
 
         //the latch was executed
         latch.await();
     }
 
 
-    @Test
+    @Test(timeout=10000)
     public void multipleListenerMultipleVersions()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -294,12 +315,22 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 10;
 
 
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * 3 );
 
         final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
         final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
@@ -329,20 +360,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
+                .thenReturn( batch );
 
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
-
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -353,9 +382,15 @@ public class EntityVersionCleanupTaskTest {
 
         //we deleted the version
         //verify we deleted everything
-        verify( firstBatch, times( sizeToReturn ) ).execute();
+        verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+        verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
 
-        verify( secondBatch, times( sizeToReturn ) ).execute();
+        verify( logBatch ).execute();
+
+        verify( entityBatch ).execute();
 
         //the latch was executed
         latch.await();
@@ -364,18 +399,15 @@ public class EntityVersionCleanupTaskTest {
 
     /**
      * Tests what happens when our listeners are VERY slow
-     * @throws ExecutionException
-     * @throws InterruptedException
-     * @throws ConnectionException
      */
-    @Test
+    @Test(timeout=10000)
     public void multipleListenerMultipleVersionsNoThreadsToRun()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -383,6 +415,17 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 10;
@@ -390,7 +433,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 5;
 
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
 
@@ -426,20 +469,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( batch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -448,7 +489,7 @@ public class EntityVersionCleanupTaskTest {
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!future.isDone()) {
+        while ( !future.isDone() ) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
@@ -456,37 +497,41 @@ public class EntityVersionCleanupTaskTest {
         //wait for the task
         future.get();
 
+
+
         //we deleted the version
         //verify we deleted everything
-        verify( firstBatch, times( sizeToReturn ) ).execute();
+        verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+        verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
+        verify( logBatch ).execute();
+
+        verify( entityBatch ).execute();
+
 
-        verify( secondBatch, times( sizeToReturn ) ).execute();
 
         //the latch was executed
         latch.await();
     }
 
 
-
     /**
      * Tests that our task will run in the caller if there's no threads, ensures that the task runs
-     * @throws ExecutionException
-     * @throws InterruptedException
-     * @throws ConnectionException
      */
-    @Test
-    public void runsWhenRejected()
-            throws ExecutionException, InterruptedException, ConnectionException {
+    @Test(timeout=10000)
+    public void runsWhenRejected() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         /**
          * only 1 thread on purpose, we want to saturate the task
          */
-        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 );
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -494,6 +539,16 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace1 = mock( Keyspace.class );
+        final Keyspace keyspace2 = mock( Keyspace.class );
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace1.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+        when( keyspace2.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 10;
@@ -501,7 +556,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 2;
 
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
 
@@ -509,7 +564,6 @@ public class EntityVersionCleanupTaskTest {
         final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
 
 
-
         final Id applicationId = new SimpleId( "application" );
 
 
@@ -528,37 +582,37 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask firstTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener), appScope, entityId, version );
-
+                        mvccEntitySerializationStrategy, keyspace1, Arrays.<EntityVersionDeleted>asList( slowListener ),
+                        appScope, entityId, version );
 
 
         //change the listeners to one that is just invoked quickly
 
 
         EntityVersionCleanupTask secondTask =
-                      new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                              mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener), appScope, entityId, version );
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, keyspace2, Arrays.<EntityVersionDeleted>asList( runListener ),
+                        appScope, entityId, version );
 
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
+                .thenReturn( batch );
 
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
-        ListenableFuture<Void> future1 =  taskExecutor.submit( firstTask );
+        ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
 
         //now start another task while the slow running task is running
-        ListenableFuture<Void> future2 =  taskExecutor.submit( secondTask );
+        ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
 
         //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
         future2.get();
@@ -566,7 +620,7 @@ public class EntityVersionCleanupTaskTest {
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!future1.isDone()) {
+        while ( !future1.isDone() ) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
@@ -576,9 +630,19 @@ public class EntityVersionCleanupTaskTest {
 
         //we deleted the version
         //verify we deleted everything
-        verify( firstBatch, times( sizeToReturn*2 ) ).execute();
 
-        verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
+
+        verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
+
+
+        verify( logBatch, times(2) ).execute();
+
+        verify( entityBatch, times(2) ).execute();
+
 
         //the latch was executed
         latch.await();
@@ -595,7 +659,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
             invocationLatch.countDown();
         }
     }
@@ -612,7 +676,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
             //wait for unblock to happen before counting down invocation latches
             try {
                 blockLatch.acquire();