You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/01 18:32:17 UTC
[13/15] git commit: Refactored listener to take buffers of versions
for efficiency
Refactored listener to take buffers of versions for efficiency
Refactored task to use RX for observable streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cadba29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cadba29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cadba29
Branch: refs/heads/two-dot-o
Commit: 8cadba29c80766bc8fe12755ac02cca2f741f077
Parents: 1a41624
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:30:28 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:30:28 2014 -0600
----------------------------------------------------------------------
.../collection/event/EntityVersionDeleted.java | 5 +-
.../impl/EntityVersionCleanupTask.java | 85 ++++---
.../serialization/impl/LogEntryIterator.java | 9 +-
.../impl/EntityVersionCleanupTaskTest.java | 230 ++++++++++++-------
4 files changed, 213 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
index 3b76e84..ff7d960 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -1,6 +1,7 @@
package org.apache.usergrid.persistence.collection.event;
+import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -21,8 +22,8 @@ public interface EntityVersionDeleted {
*
* @param scope The scope of the entity
* @param entityId The entity Id that was removed
- * @param entityVersion The version that was removed
+ * @param entityVersions The versions that are to be removed
*/
- public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+ public void versionDeleted(final CollectionScope scope, final Id entityId, final List<UUID> entityVersions);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index d7ece40..2d30d36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,9 +1,9 @@
package org.apache.usergrid.persistence.collection.impl;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,9 +15,14 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
@@ -37,6 +42,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
private final MvccEntitySerializationStrategy entitySerializationStrategy;
+ private final Keyspace keyspace;
private final SerializationFig serializationFig;
@@ -48,12 +54,13 @@ public class EntityVersionCleanupTask implements Task<Void> {
public EntityVersionCleanupTask( final SerializationFig serializationFig,
final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final MvccEntitySerializationStrategy entitySerializationStrategy,
- final List<EntityVersionDeleted> listeners, final CollectionScope scope,
- final Id entityId, final UUID version ) {
+ final Keyspace keyspace, final List<EntityVersionDeleted> listeners,
+ final CollectionScope scope, final Id entityId, final UUID version ) {
this.serializationFig = serializationFig;
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ this.keyspace = keyspace;
this.listeners = listeners;
this.scope = scope;
this.entityId = entityId;
@@ -91,36 +98,67 @@ public class EntityVersionCleanupTask implements Task<Void> {
final UUID maxVersion = version;
- LogEntryIterator logEntryIterator =
- new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
- serializationFig.getHistorySize() );
+ Observable<MvccLogEntry> versions = Observable.create( new ObservableIterator( "versionIterators" ) {
+ @Override
+ protected Iterator getIterator() {
+ return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+ serializationFig.getBufferSize() );
+ }
+ } );
- //for every entry, we want to clean it up with listeners
+ //get the uuid from the version
+ versions.map( new Func1<MvccLogEntry, UUID>() {
+ @Override
+ public UUID call( final MvccLogEntry mvccLogEntry ) {
+ return mvccLogEntry.getVersion();
+ }
+ } )
+ //buffer our versions
+ .buffer( serializationFig.getBufferSize() )
+ //for each buffer set, delete all of them
+ .doOnNext( new Action1<List<UUID>>() {
+ @Override
+ public void call( final List<UUID> versions ) {
- while ( logEntryIterator.hasNext() ) {
+ //Fire all the listeners
+ fireEvents( versions );
- final MvccLogEntry logEntry = logEntryIterator.next();
+ MutationBatch entityBatch = keyspace.prepareMutationBatch();
+ MutationBatch logBatch = keyspace.prepareMutationBatch();
+ for ( UUID version : versions ) {
+ final MutationBatch entityDelete = entitySerializationStrategy.delete( scope, entityId, version );
- final UUID version = logEntry.getVersion();
+ entityBatch.mergeShallow( entityDelete );
+ final MutationBatch logDelete = logEntrySerializationStrategy.delete( scope, entityId, version );
- fireEvents();
+ logBatch.mergeShallow( logDelete );
+ }
- //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
- //after every successful invocation of listeners and entity removal
- entitySerializationStrategy.delete( scope, entityId, version ).execute();
- logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
- }
+ try {
+ entityBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to delete entities in cleanup", e );
+ }
+ try {
+ logBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to delete entities from the log", e );
+ }
+ }
+ } ).count().toBlocking().last();
return null;
}
- private void fireEvents() throws ExecutionException, InterruptedException {
+ private void fireEvents( final List<UUID> versions ) {
final int listenerSize = listeners.size();
@@ -129,7 +167,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
}
if ( listenerSize == 1 ) {
- listeners.get( 0 ).versionDeleted( scope, entityId, version );
+ listeners.get( 0 ).versionDeleted( scope, entityId, versions );
return;
}
@@ -146,7 +184,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
@Override
public void call( final EntityVersionDeleted listener ) {
- listener.versionDeleted( scope, entityId, version );
+ listener.versionDeleted( scope, entityId, versions );
}
} );
}
@@ -154,15 +192,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
LOG.debug( "Finished firing {} listeners", listenerSize );
}
-
-
- private static interface ListenerRunner {
-
- /**
- * Run the listeners
- */
- public void runListeners();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
index 53eb6e3..d87f850 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -11,12 +11,12 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.google.common.base.Preconditions;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/**
* Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- *
*/
public class LogEntryIterator implements Iterator<MvccLogEntry> {
@@ -32,16 +32,19 @@ public class LogEntryIterator implements Iterator<MvccLogEntry> {
/**
- *
* @param logEntrySerializationStrategy The serialization strategy to get the log entries
* @param scope The scope of the entity
* @param entityId The id of the entity
- * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max
+ * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version
+ * < max
* @param pageSize The fetch size to get when querying the serialization strategy
*/
public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final CollectionScope scope, final Id entityId, final UUID maxVersion,
final int pageSize ) {
+
+ Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.scope = scope;
this.entityId = entityId;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index a34b4f8..1fce6e2 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.junit.AfterClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -43,6 +42,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -69,13 +69,13 @@ public class EntityVersionCleanupTaskTest {
}
- @Test
+ @Test(timeout=10000)
public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -83,6 +83,15 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//intentionally no events
final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -105,20 +114,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch newBatch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( newBatch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( newBatch );
//start the task
@@ -128,22 +135,22 @@ public class EntityVersionCleanupTaskTest {
future.get();
//verify it was run
- verify( firstBatch ).execute();
+ verify( entityBatch ).execute();
- verify( secondBatch ).execute();
+ verify( logBatch ).execute();
}
/**
* Tests the cleanup task on the first version ceated
*/
- @Test
+ @Test(timeout=10000)
public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -151,6 +158,15 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//intentionally no events
final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -173,20 +189,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -196,19 +210,19 @@ public class EntityVersionCleanupTaskTest {
future.get();
//verify it was run
- verify( firstBatch, never() ).execute();
+ verify( entityBatch, never() ).execute();
- verify( secondBatch, never() ).execute();
+ verify( logBatch, never() ).execute();
}
- @Test
+ @Test(timeout=10000)
public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -216,6 +230,15 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 1;
@@ -246,20 +269,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -270,23 +291,23 @@ public class EntityVersionCleanupTaskTest {
//we deleted the version
//verify it was run
- verify( firstBatch ).execute();
+ verify( entityBatch ).execute();
- verify( secondBatch ).execute();
+ verify( logBatch ).execute();
//the latch was executed
latch.await();
}
- @Test
+ @Test(timeout=10000)
public void multipleListenerMultipleVersions()
throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -294,12 +315,22 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 10;
- final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * 3 );
final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
@@ -329,20 +360,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
-
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -353,9 +382,15 @@ public class EntityVersionCleanupTaskTest {
//we deleted the version
//verify we deleted everything
- verify( firstBatch, times( sizeToReturn ) ).execute();
+ verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+ verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
- verify( secondBatch, times( sizeToReturn ) ).execute();
+ verify( logBatch ).execute();
+
+ verify( entityBatch ).execute();
//the latch was executed
latch.await();
@@ -364,18 +399,15 @@ public class EntityVersionCleanupTaskTest {
/**
* Tests what happens when our listeners are VERY slow
- * @throws ExecutionException
- * @throws InterruptedException
- * @throws ConnectionException
*/
- @Test
+ @Test(timeout=10000)
public void multipleListenerMultipleVersionsNoThreadsToRun()
throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -383,6 +415,17 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 10;
@@ -390,7 +433,7 @@ public class EntityVersionCleanupTaskTest {
final int listenerCount = 5;
- final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
final Semaphore waitSemaphore = new Semaphore( 0 );
@@ -426,20 +469,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -448,7 +489,7 @@ public class EntityVersionCleanupTaskTest {
/**
* While we're not done, release latches every 200 ms
*/
- while(!future.isDone()) {
+ while ( !future.isDone() ) {
Thread.sleep( 200 );
waitSemaphore.release( listenerCount );
}
@@ -456,37 +497,41 @@ public class EntityVersionCleanupTaskTest {
//wait for the task
future.get();
+
+
//we deleted the version
//verify we deleted everything
- verify( firstBatch, times( sizeToReturn ) ).execute();
+ verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+ verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
+ verify( logBatch ).execute();
+
+ verify( entityBatch ).execute();
+
- verify( secondBatch, times( sizeToReturn ) ).execute();
//the latch was executed
latch.await();
}
-
/**
* Tests that our task will run in the caller if there's no threads, ensures that the task runs
- * @throws ExecutionException
- * @throws InterruptedException
- * @throws ConnectionException
*/
- @Test
- public void runsWhenRejected()
- throws ExecutionException, InterruptedException, ConnectionException {
+ @Test(timeout=10000)
+ public void runsWhenRejected() throws ExecutionException, InterruptedException, ConnectionException {
/**
* only 1 thread on purpose, we want to saturate the task
*/
- final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 );
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -494,6 +539,16 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace1 = mock( Keyspace.class );
+ final Keyspace keyspace2 = mock( Keyspace.class );
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace1.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+ when( keyspace2.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 10;
@@ -501,7 +556,7 @@ public class EntityVersionCleanupTaskTest {
final int listenerCount = 2;
- final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
final Semaphore waitSemaphore = new Semaphore( 0 );
@@ -509,7 +564,6 @@ public class EntityVersionCleanupTaskTest {
final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
-
final Id applicationId = new SimpleId( "application" );
@@ -528,37 +582,37 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask firstTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener), appScope, entityId, version );
-
+ mvccEntitySerializationStrategy, keyspace1, Arrays.<EntityVersionDeleted>asList( slowListener ),
+ appScope, entityId, version );
//change the listeners to one that is just invoked quickly
EntityVersionCleanupTask secondTask =
- new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener), appScope, entityId, version );
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, keyspace2, Arrays.<EntityVersionDeleted>asList( runListener ),
+ appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
- ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
+ ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
//now start another task while the slow running task is running
- ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
+ ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
//get the second task, we shouldn't have been able to queue it, therefore it should just run in process
future2.get();
@@ -566,7 +620,7 @@ public class EntityVersionCleanupTaskTest {
/**
* While we're not done, release latches every 200 ms
*/
- while(!future1.isDone()) {
+ while ( !future1.isDone() ) {
Thread.sleep( 200 );
waitSemaphore.release( listenerCount );
}
@@ -576,9 +630,19 @@ public class EntityVersionCleanupTaskTest {
//we deleted the version
//verify we deleted everything
- verify( firstBatch, times( sizeToReturn*2 ) ).execute();
- verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
+
+ verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
+
+
+ verify( logBatch, times(2) ).execute();
+
+ verify( entityBatch, times(2) ).execute();
+
//the latch was executed
latch.await();
@@ -595,7 +659,7 @@ public class EntityVersionCleanupTaskTest {
@Override
- public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
invocationLatch.countDown();
}
}
@@ -612,7 +676,7 @@ public class EntityVersionCleanupTaskTest {
@Override
- public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
//wait for unblock to happen before counting down invocation latches
try {
blockLatch.acquire();