You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/12/16 15:59:48 UTC
[18/50] incubator-usergrid git commit: Removed queue from NamedTask
Executor and made it do one event at a time. Added Rejected tests to Entity
Events.
Removed queue from NamedTask Executor and made it do one event at a time. Added Rejected tests to Entity Events.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a49b4449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a49b4449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a49b4449
Branch: refs/heads/no-source-in-es
Commit: a49b444961c0fc498b2bee0f396c8ca56b199a31
Parents: e9f876c
Author: grey <gr...@apigee.com>
Authored: Tue Nov 4 12:12:24 2014 -0800
Committer: grey <gr...@apigee.com>
Committed: Tue Nov 4 12:12:24 2014 -0800
----------------------------------------------------------------------
.../impl/EntityVersionCleanupTaskTest.java | 129 ++++++-------------
.../impl/EntityVersionCreatedTaskTest.java | 46 +++++++
.../core/task/NamedTaskExecutorImpl.java | 114 +++++++++++++++-
.../core/task/NamedTaskExecutorImplTest.java | 32 +++++
4 files changed, 228 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 621db20..0e86027 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -639,168 +639,123 @@ public class EntityVersionCleanupTaskTest {
latch.await();
}
-
/**
* Tests that our task will run in the caller if there's no threads, ensures that the task runs
*/
- @Ignore("Test is a work in progress")
@Test(timeout=10000)
- public void runsWhenRejected()
+ public void singleListenerSingleVersionRejected()
throws ExecutionException, InterruptedException, ConnectionException {
- /**
- * only 1 thread on purpose, we want to saturate the task
- */
- final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 );
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
final SerializationFig serializationFig = mock( SerializationFig.class );
when( serializationFig.getBufferSize() ).thenReturn( 10 );
- final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ final MvccEntitySerializationStrategy ess =
mock( MvccEntitySerializationStrategy.class );
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
- final Keyspace keyspace1 = mock( Keyspace.class );
- final Keyspace keyspace2 = mock( Keyspace.class );
+ final Keyspace keyspace = mock( Keyspace.class );
final MutationBatch entityBatch = mock( MutationBatch.class );
final MutationBatch logBatch = mock( MutationBatch.class );
- when( keyspace1.prepareMutationBatch() )
- .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch )
- .thenReturn( logBatch );
+ when( keyspace.prepareMutationBatch() )
+ .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
+ .thenReturn( entityBatch )
+ .thenReturn( logBatch );
- when( keyspace2.prepareMutationBatch() )
- .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
- .thenReturn( entityBatch )
- .thenReturn( logBatch );
//create a latch for the event listener, and add it to the list of events
- final int sizeToReturn = 10;
-
-
- final int listenerCount = 2;
+ final int sizeToReturn = 1;
- final CountDownLatch latch = new CountDownLatch(
- sizeToReturn/serializationFig.getBufferSize() * listenerCount );
- final Semaphore waitSemaphore = new Semaphore( 0 );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+ final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
- final SlowListener slowListener = new SlowListener( latch, waitSemaphore );
- final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
+ final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
+ listeners.add( eventListener );
final Id applicationId = new SimpleId( "application" );
- final CollectionScope appScope = new CollectionScopeImpl(
+ final CollectionScope appScope = new CollectionScopeImpl(
applicationId, applicationId, "users" );
final Id entityId = new SimpleId( "user" );
//mock up a single log entry for our first test
- final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
+ final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
mock( UniqueValueSerializationStrategy.class );
-
- EntityVersionCleanupTask firstTask = new EntityVersionCleanupTask(
- serializationFig,
- mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy,
- uniqueValueSerializationStrategy,
- keyspace1,
- Sets.newSet( (EntityVersionDeleted)runListener ),
- appScope,
- entityId,
- version );
-
-
- //change the listeners to one that is just invoked quickly
-
-
- EntityVersionCleanupTask secondTask = new EntityVersionCleanupTask(
- serializationFig,
- mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy,
- uniqueValueSerializationStrategy,
- keyspace2,
- Sets.newSet( (EntityVersionDeleted)runListener ),
- appScope,
- entityId,
- version );
-
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig,
+ mvccLogEntrySerializationStrategy,
+ ess,
+ uniqueValueSerializationStrategy,
+ keyspace,
+ listeners,
+ appScope,
+ entityId,
+ version
+ );
final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
- when( mvccEntitySerializationStrategy
- .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( batch );
-
when( mvccLogEntrySerializationStrategy
.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( batch );
- //start the task
- ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
+ final List<MvccEntity> mel = new ArrayList<MvccEntity>();
- //now start another task while the slow running task is running
- ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- //get the second task, we shouldn't have been able to queue it,
- // therefore it should just run in process
- future2.get();
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- /**
- * While we're not done, release latches every 200 ms
- */
- while ( !future1.isDone() ) {
- Thread.sleep( 200 );
- waitSemaphore.release( listenerCount );
- }
+ when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+ .thenReturn(mel.iterator() );
- //wait for the task
- future1.get();
- //we deleted the version
- //verify we deleted everything
+ //start the task
+ ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+ //wait for the task
+ future.get();
//we deleted the version
- //verify we deleted everything
- verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
-
- verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
-
-
- verify( logBatch, times(2) ).execute();
-
- verify( entityBatch, times(2) ).execute();
+ //verify it was run
+ verify( entityBatch ).execute();
+ verify( logBatch ).execute();
//the latch was executed
latch.await();
}
-
private static class EntityVersionDeletedTest implements EntityVersionDeleted {
final CountDownLatch invocationLatch;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
index 9d72665..24ea280 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
@@ -200,6 +200,52 @@ public class EntityVersionCreatedTaskTest {
verify( listeners ).iterator();
}
+ @Test(timeout=10000)
+ public void oneListenerRejected()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+ // create a latch for the event listener, and add it to the list of events
+
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
+
+ final int sizeToReturn = 1;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+ final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
+
+ final Set<EntityVersionCreated> listeners = mock( Set.class );
+ final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+
+ when ( listeners.size()).thenReturn( 1 );
+ when ( listeners.iterator()).thenReturn( helper );
+ when ( helper.next() ).thenReturn( eventListener );
+
+ final Id applicationId = new SimpleId( "application" );
+
+ final CollectionScope appScope = new CollectionScopeImpl(
+ applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+ final Entity entity = new Entity( entityId );
+
+ // start the task
+
+ EntityVersionCreatedTask entityVersionCreatedTask =
+ new EntityVersionCreatedTask( appScope, listeners, entity);
+
+ ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
+
+ // wait for the task
+ future.get();
+
+ //mocked listener makes sure that the task is called
+ verify( listeners ).size();
+ verify( listeners ).iterator();
+ verify( helper ).next();
+
+ }
+
private static class EntityVersionCreatedTest implements EntityVersionCreated {
final CountDownLatch invocationLatch;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index b18687a..a022c08 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,16 +1,24 @@
package org.apache.usergrid.persistence.core.task;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -36,7 +44,6 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
private final String name;
private final int poolSize;
- private final int queueLength;
/**
@@ -47,17 +54,24 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
- Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+ Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" );
Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
this.name = name;
this.poolSize = poolSize;
- this.queueLength = queueLength;
- final BlockingQueue<Runnable> queue =
- queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+ //The user has chosen to disable asynchronous execution, to create an executor service that will reject all requests
+ if(poolSize == 0){
+ executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService());
+ }
+
+ //queue executions as normal
+ else {
+ final BlockingQueue<Runnable> queue =
+ queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+ executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+ }
}
@@ -164,4 +178,92 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
}
+
+
+ /**
+ * Executor implementation that simply rejects all incoming tasks
+ */
+ private static final class RejectingExecutorService implements ExecutorService{
+
+ @Override
+ public void shutdown() {
+
+ }
+
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return Collections.EMPTY_LIST;
+ }
+
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+
+ @Override
+ public boolean awaitTermination( final long timeout, final TimeUnit unit ) throws InterruptedException {
+ return false;
+ }
+
+
+ @Override
+ public <T> Future<T> submit( final Callable<T> task ) {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public <T> Future<T> submit( final Runnable task, final T result ) {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public Future<?> submit( final Runnable task ) {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks )
+ throws InterruptedException {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, final long timeout,
+ final TimeUnit unit ) throws InterruptedException {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public <T> T invokeAny( final Collection<? extends Callable<T>> tasks )
+ throws InterruptedException, ExecutionException {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit )
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+
+
+ @Override
+ public void execute( final Runnable command ) {
+ throw new RejectedExecutionException("No Asynchronous tasks allowed");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 34f57e5..65189f1 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -185,6 +185,38 @@ public class NamedTaskExecutorImplTest {
}
+ @Test
+ public void rejectingTaskExecutor() throws InterruptedException {
+
+ final int threadPoolSize = 0;
+ final int queueSize = 0;
+
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 1 );
+ final CountDownLatch runLatch = new CountDownLatch( 0 );
+
+
+ //now submit the second task
+
+
+
+ final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( testTask );
+
+
+ //should be immediately rejected
+ rejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //if we get here we've been rejected, just double check we didn't run
+
+ assertEquals( 0l, exceptionLatch.getCount() );
+ assertEquals( 0l, runLatch.getCount() );
+ }
+
+
private static abstract class TestTask<V> implements Task<V> {
private final List<Throwable> exceptions;