You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:05 UTC
[06/19] git commit: Finished testing of the EntityVersionCleanupTask
Finished testing of the EntityVersionCleanupTask
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d4f80e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d4f80e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d4f80e7c
Branch: refs/heads/two-dot-o-rebuildable-index
Commit: d4f80e7c85093e17081a4210491022b9001a02b7
Parents: 018dbee
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 15:25:09 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:07:50 2014 -0600
----------------------------------------------------------------------
.../impl/EntityVersionCleanupTask.java | 121 ++--
.../impl/EntityVersionCleanupTaskTest.java | 583 ++++++++++++++++++-
.../persistence/core/task/ImmediateTask.java | 12 +-
.../core/task/NamedTaskExecutorImpl.java | 10 +-
.../usergrid/persistence/core/task/Task.java | 27 +-
.../persistence/core/task/TaskExecutor.java | 7 +-
.../core/task/NamedTaskExecutorImplTest.java | 9 +-
.../impl/shard/ShardGroupCompaction.java | 2 +-
.../shard/impl/ShardGroupCompactionImpl.java | 13 +-
9 files changed, 684 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 29ca3ac..3d1483d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,10 +1,10 @@
package org.apache.usergrid.persistence.collection.impl;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Stack;
import java.util.UUID;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import org.slf4j.Logger;
@@ -15,10 +15,8 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.entity.EntityVersion;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -27,12 +25,11 @@ import org.apache.usergrid.persistence.model.entity.Id;
* Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
* retained, the range is exclusive.
*/
-public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+public class EntityVersionCleanupTask extends Task<Void> {
private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
- private final CollectionIoEvent<EntityVersion> collectionIoEvent;
private final List<EntityVersionDeleted> listeners;
private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
@@ -40,29 +37,31 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
private final SerializationFig serializationFig;
+ private final CollectionScope scope;
+ private final Id entityId;
+ private final UUID version;
- private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final CollectionIoEvent<EntityVersion> collectionIoEvent,
- final List<EntityVersionDeleted> listeners,
- final SerializationFig serializationFig ) {
- this.collectionIoEvent = collectionIoEvent;
- this.listeners = listeners;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.entitySerializationStrategy = entitySerializationStrategy;
- this.serializationFig = serializationFig;
- }
+ public EntityVersionCleanupTask( final SerializationFig serializationFig,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final List<EntityVersionDeleted> listeners, final CollectionScope scope,
+ final Id entityId, final UUID version ) {
- @Override
- public CollectionIoEvent<EntityVersion> getId() {
- return collectionIoEvent;
+ this.serializationFig = serializationFig;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.listeners = listeners;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.version = version;
}
@Override
public void exceptionThrown( final Throwable throwable ) {
- LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+ LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+ new Object[] { scope, entityId, version }, throwable );
}
@@ -81,15 +80,15 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
@Override
- public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
+ public Void executeTask() throws Exception {
- final CollectionScope scope = collectionIoEvent.getEntityCollection();
- final Id entityId = collectionIoEvent.getEvent().getId();
- final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+ final UUID maxVersion = version;
LogEntryIterator logEntryIterator =
- new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() );
+ new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+ serializationFig.getHistorySize() );
//for every entry, we want to clean it up with listeners
@@ -100,39 +99,73 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
final UUID version = logEntry.getVersion();
- List<ForkJoinTask<Void>> tasks = new ArrayList<>();
- //execute all the listeners
- for (final EntityVersionDeleted listener : listeners ) {
+ fireEvents();
- tasks.add( new RecursiveTask<Void>() {
- @Override
- protected Void compute() {
- listener.versionDeleted( scope, entityId, version );
- return null;
- }
- }.fork() );
+ //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
+ //after every successful invocation of listeners and entity removal
+ entitySerializationStrategy.delete( scope, entityId, version ).execute();
+ logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+ }
- }
- //wait for them to complete
+ return null;
+ }
- joinAll(tasks);
- //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
- //after every successful invocation of listeners and entity removal
- entitySerializationStrategy.delete( scope, entityId, version ).execute();
+ private void fireEvents() throws ExecutionException, InterruptedException {
- logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+ if ( listeners.size() == 0 ) {
+ return;
}
+ //stack to track forked tasks
+ final Stack<RecursiveTask<Void>> tasks = new Stack<>();
+
+
+ //we don't want to fork the final listener, we'll run that in our current thread
+ final int forkedTaskSize = listeners.size() - 1;
+
+
+ //execute all the listeners
+ for ( int i = 0; i < forkedTaskSize; i++ ) {
+
+ final EntityVersionDeleted listener = listeners.get( i );
+
+ final RecursiveTask<Void> task = createTask( listener );
+
+ task.fork();
- return collectionIoEvent;
+ tasks.push( task );
+ }
+
+
+ final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize ) );
+
+ lastTask.invoke();
+
+
+ //wait for them to complete
+ while ( !tasks.isEmpty() ) {
+ tasks.pop().get();
+ }
}
+ /**
+ * Return the new task to execute
+ */
+ private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
+ return new RecursiveTask<Void>() {
+ @Override
+ protected Void compute() {
+ listener.versionDeleted( scope, entityId, version );
+ return null;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 050ea9e..2df75f1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -22,10 +22,34 @@ package org.apache.usergrid.persistence.collection.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import org.junit.AfterClass;
import org.junit.Test;
-import static org.junit.Assert.*;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
@@ -33,8 +57,563 @@ import static org.junit.Assert.*;
*/
public class EntityVersionCleanupTaskTest {
+ private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4 );
+
+
+ @AfterClass
+ public static void shutdown() {
+ taskExecutor.shutdown();
+ }
+
+
+ @Test
+ public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //intentionally no events
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock =
+ LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 2 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //verify it was run
+ verify( firstBatch ).execute();
+
+ verify( secondBatch ).execute();
+ }
+
+
+ /**
+ * Tests the cleanup task on the first version ceated
+ */
+ @Test
+ public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //intentionally no events
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock =
+ LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //verify it was run
+ verify( firstBatch, never() ).execute();
+
+ verify( secondBatch, never() ).execute();
+ }
+
+
@Test
- public void multiPageTask(){
+ public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 1;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+ final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ listeners.add( eventListener );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //we deleted the version
+ //verify it was run
+ verify( firstBatch ).execute();
+
+ verify( secondBatch ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+ @Test
+ public void multipleListenerMultipleVersions()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 10;
+
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+
+ final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+ final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+ final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ listeners.add( listener1 );
+ listeners.add( listener2 );
+ listeners.add( listener3 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( firstBatch, times( sizeToReturn ) ).execute();
+
+ verify( secondBatch, times( sizeToReturn ) ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+ /**
+ * Tests what happens when our listeners are VERY slow
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws ConnectionException
+ */
+ @Test
+ public void multipleListenerMultipleVersionsNoThreadsToRun()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 10;
+
+
+ final int listenerCount = 5;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+ final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+ listeners.add( listener1 );
+ listeners.add( listener2 );
+ listeners.add( listener3 );
+ listeners.add( listener4 );
+ listeners.add( listener5 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ /**
+ * While we're not done, release latches every 200 ms
+ */
+ while(!cleanupTask.isDone()) {
+ Thread.sleep( 200 );
+ waitSemaphore.release( listenerCount );
+ }
+
+ //wait for the task
+ cleanupTask.get();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( firstBatch, times( sizeToReturn ) ).execute();
+
+ verify( secondBatch, times( sizeToReturn ) ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+
+ /**
+ * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws ConnectionException
+ */
+ @Test
+ public void runsWhenRejected()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ /**
+ * only 1 thread on purpose, we want to saturate the task
+ */
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 10;
+
+
+ final int listenerCount = 1;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+ final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+ listeners.add( listener1 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask firstTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ EntityVersionCleanupTask secondTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( firstTask );
+
+ //now start another task while the slow running task is running
+ taskExecutor.submit( secondTask );
+
+ //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
+ secondTask.get();
+
+ /**
+ * While we're not done, release latches every 200 ms
+ */
+ while(!firstTask.isDone()) {
+ Thread.sleep( 200 );
+ waitSemaphore.release( listenerCount );
+ }
+
+ //wait for the task
+ firstTask.get();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( firstBatch, times( sizeToReturn*2 ) ).execute();
+
+ verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+ private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+ final CountDownLatch invocationLatch;
+
+
+ private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+ this.invocationLatch = invocationLatch;
+ }
+
+
+ @Override
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ invocationLatch.countDown();
+ }
+ }
+
+
+ private static class SlowListener extends EntityVersionDeletedTest {
+ final Semaphore blockLatch;
+
+
+ private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+ super( invocationLatch );
+ this.blockLatch = blockLatch;
+ }
+
+ @Override
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ //wait for unblock to happen before counting down invocation latches
+ try {
+ blockLatch.acquire();
+ }
+ catch ( InterruptedException e ) {
+ throw new RuntimeException( e );
+ }
+ super.versionDeleted( scope, entityId, entityVersion );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
index 627e7e8..f6c28a3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -5,23 +5,17 @@ package org.apache.usergrid.persistence.core.task;
* Does not perform computation, just returns the value passed to it
*
*/
-public class ImmediateTask<V, I> extends Task<V, I> {
+public class ImmediateTask<V> extends Task<V> {
+
- private final I id;
private final V returned;
- protected ImmediateTask( final I id, final V returned ) {
- this.id = id;
+ protected ImmediateTask( final V returned ) {
this.returned = returned;
}
- @Override
- public I getId() {
- return id;
- }
-
@Override
public V executeTask() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 4fc72c8..bb9cac8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -55,6 +55,8 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
* @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
*/
public NamedTaskExecutorImpl( final String name, final int poolSize ) {
+
+ //TODO, figure out how to name the fork/join threads in the pool
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -67,7 +69,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
- public <V, I> Task<V, I> submit( final Task<V, I> task ) {
+ public <V> Task<V> submit( final Task<V> task ) {
try {
executorService.submit( task );
@@ -80,6 +82,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
+ @Override
+ public void shutdown() {
+ executorService.shutdownNow();
+ }
+
+
private final class NamedForkJoinPool extends ForkJoinPool {
private NamedForkJoinPool( final int workerThreadCount ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 4dccc72..5d35ce4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -29,13 +29,7 @@ import java.util.concurrent.RecursiveTask;
/**
* The task to execute
*/
-public abstract class Task<V, I> extends RecursiveTask<V> {
-
- /**
- * Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
- */
- public abstract I getId();
-
+public abstract class Task<V> extends RecursiveTask<V> {
@Override
protected V compute() {
@@ -49,25 +43,6 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
}
- /**
- * Fork all tasks in the list except the last one. The last will be run in the caller thread
- * The others will wait for join
- * @param tasks
- *
- */
- public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T> tasks) throws ExecutionException, InterruptedException {
-
- //don't fork the last one
- List<V> results = new ArrayList<>(tasks.size());
-
- for(T task: tasks){
- results.add( task.join() );
- }
-
- return results;
-
- }
-
/**
* Execute the task
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index a3553c7..c6f5915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,10 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> Task<V, I > submit( Task<V, I> task );
+ public <V> Task<V > submit( Task<V> task );
+
+ /**
+ * Stop the task executor without waiting for scheduled threads to run
+ */
+ public void shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index f65d5a6..9656b5f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+ final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
executor.submit( task );
@@ -254,7 +254,7 @@ public class NamedTaskExecutorImplTest {
}
- private static abstract class TestTask<V> extends Task<V, UUID> {
+ private static abstract class TestTask<V> extends Task<V> {
protected final List<Throwable> exceptions;
protected final CountDownLatch exceptionLatch;
@@ -272,11 +272,6 @@ public class NamedTaskExecutorImplTest {
}
- @Override
- public UUID getId() {
- return UUIDGenerator.newTimeUUID();
- }
-
@Override
public void exceptionThrown( final Throwable throwable ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 15acaa8..ed63587 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -44,7 +44,7 @@ public interface ShardGroupCompaction {
*
* @return A ListenableFuture with the result. Note that some
*/
- public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+ public Task<AuditResult> evaluateShardGroup(
final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 751b4d9..a84a0f0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -273,7 +273,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+ public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
final DirectedEdgeMeta edgeMeta,
final ShardEntryGroup group ) {
@@ -282,7 +282,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
//don't audit, we didn't hit our chance
if ( repairChance > graphFig.getShardRepairChance() ) {
- return new ImmediateTask<AuditResult, ShardAuditKey>( new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
+ return new ImmediateTask<AuditResult>( AuditResult.NOT_CHECKED ) {};
}
/**
@@ -295,7 +295,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
+ private final class ShardAuditTask extends Task<AuditResult> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
@@ -310,11 +310,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- @Override
- public ShardAuditKey getId() {
- return new ShardAuditKey( scope, edgeMeta, group );
- }
-
@Override
public void exceptionThrown( final Throwable throwable ) {
@@ -325,7 +320,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
public void rejected() {
//ignore, if this happens we don't care, we're saturated, we can check later
- LOG.error( "Rejected audit for shard of {}", getId() );
+ LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
}