You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/12/16 15:59:48 UTC

[18/50] incubator-usergrid git commit: Removed queue from NamedTask Executor and made it do one event at a time. Added Rejected tests to Entity Events.

Removed queue from NamedTask Executor and made it do one event at a time. Added Rejected tests to Entity Events.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a49b4449
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a49b4449
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a49b4449

Branch: refs/heads/no-source-in-es
Commit: a49b444961c0fc498b2bee0f396c8ca56b199a31
Parents: e9f876c
Author: grey <gr...@apigee.com>
Authored: Tue Nov 4 12:12:24 2014 -0800
Committer: grey <gr...@apigee.com>
Committed: Tue Nov 4 12:12:24 2014 -0800

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTaskTest.java      | 129 ++++++-------------
 .../impl/EntityVersionCreatedTaskTest.java      |  46 +++++++
 .../core/task/NamedTaskExecutorImpl.java        | 114 +++++++++++++++-
 .../core/task/NamedTaskExecutorImplTest.java    |  32 +++++
 4 files changed, 228 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 621db20..0e86027 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -639,168 +639,123 @@ public class EntityVersionCleanupTaskTest {
         latch.await();
     }
 
-
     /**
      * Tests that our task will run in the caller if there's no threads, ensures that the task runs
      */
-    @Ignore("Test is a work in progress")
     @Test(timeout=10000)
-    public void runsWhenRejected() 
+    public void singleListenerSingleVersionRejected()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
-        /**
-         * only 1 thread on purpose, we want to saturate the task
-         */
-        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 );
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
         when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
-        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+        final MvccEntitySerializationStrategy ess =
                 mock( MvccEntitySerializationStrategy.class );
 
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
-        final Keyspace keyspace1 = mock( Keyspace.class );
-        final Keyspace keyspace2 = mock( Keyspace.class );
+        final Keyspace keyspace = mock( Keyspace.class );
 
 
         final MutationBatch entityBatch = mock( MutationBatch.class );
         final MutationBatch logBatch = mock( MutationBatch.class );
 
-        when( keyspace1.prepareMutationBatch() )
-            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-            .thenReturn( entityBatch )
-            .thenReturn( logBatch );
+        when( keyspace.prepareMutationBatch() )
+                .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch )
+                .thenReturn( logBatch );
 
-        when( keyspace2.prepareMutationBatch() )
-            .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
-            .thenReturn( entityBatch )
-            .thenReturn( logBatch );
 
 
         //create a latch for the event listener, and add it to the list of events
-        final int sizeToReturn = 10;
-
-
-        final int listenerCount = 2;
+        final int sizeToReturn = 1;
 
-        final CountDownLatch latch = new CountDownLatch( 
-                sizeToReturn/serializationFig.getBufferSize() * listenerCount );
-        final Semaphore waitSemaphore = new Semaphore( 0 );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
 
+        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
 
-        final SlowListener slowListener = new SlowListener( latch, waitSemaphore );
-        final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
+        final Set<EntityVersionDeleted> listeners = new HashSet<EntityVersionDeleted>();
 
+        listeners.add( eventListener );
 
         final Id applicationId = new SimpleId( "application" );
 
 
-        final CollectionScope appScope = new CollectionScopeImpl( 
+        final CollectionScope appScope = new CollectionScopeImpl(
                 applicationId, applicationId, "users" );
 
         final Id entityId = new SimpleId( "user" );
 
 
         //mock up a single log entry for our first test
-        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock( 
+        final LogEntryMock logEntryMock = LogEntryMock.createLogEntryMock(
                 mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
 
 
         final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
 
+
         final UniqueValueSerializationStrategy uniqueValueSerializationStrategy =
                 mock( UniqueValueSerializationStrategy.class );
 
-
-        EntityVersionCleanupTask firstTask = new EntityVersionCleanupTask( 
-            serializationFig, 
-            mvccLogEntrySerializationStrategy,
-            mvccEntitySerializationStrategy, 
-            uniqueValueSerializationStrategy, 
-            keyspace1,
-            Sets.newSet( (EntityVersionDeleted)runListener ),
-            appScope,  
-            entityId, 
-            version );
-
-
-        //change the listeners to one that is just invoked quickly
-
-
-        EntityVersionCleanupTask secondTask = new EntityVersionCleanupTask( 
-            serializationFig,
-            mvccLogEntrySerializationStrategy,
-            mvccEntitySerializationStrategy,
-            uniqueValueSerializationStrategy, 
-            keyspace2, 
-            Sets.newSet( (EntityVersionDeleted)runListener ),
-            appScope,
-            entityId, 
-            version );
-
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig,
+                        mvccLogEntrySerializationStrategy,
+                        ess,
+                        uniqueValueSerializationStrategy,
+                        keyspace,
+                        listeners,
+                        appScope,
+                        entityId,
+                        version
+                );
 
         final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
-        when( mvccEntitySerializationStrategy
-                .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+        when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                 .thenReturn( batch );
 
 
-
         when( mvccLogEntrySerializationStrategy
                 .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
                 .thenReturn( batch );
 
 
-        //start the task
-        ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
+        final List<MvccEntity> mel = new ArrayList<MvccEntity>();
 
-        //now start another task while the slow running task is running
-        ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        //get the second task, we shouldn't have been able to queue it, 
-        // therefore it should just run in process
-        future2.get();
+        mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+                MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
 
-        /**
-         * While we're not done, release latches every 200 ms
-         */
-        while ( !future1.isDone() ) {
-            Thread.sleep( 200 );
-            waitSemaphore.release( listenerCount );
-        }
+        when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+                .thenReturn(mel.iterator() );
 
-        //wait for the task
-        future1.get();
 
-        //we deleted the version
-        //verify we deleted everything
+        //start the task
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
+        //wait for the task
+        future.get();
 
         //we deleted the version
-        //verify we deleted everything
-        verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
-
-        verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
-
-
-        verify( logBatch, times(2) ).execute();
-
-        verify( entityBatch, times(2) ).execute();
+        //verify it was run
+        verify( entityBatch ).execute();
 
+        verify( logBatch ).execute();
 
         //the latch was executed
         latch.await();
     }
 
-
     private static class EntityVersionDeletedTest implements EntityVersionDeleted {
         final CountDownLatch invocationLatch;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
index 9d72665..24ea280 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
@@ -200,6 +200,52 @@ public class EntityVersionCreatedTaskTest {
         verify( listeners ).iterator();
     }
 
+    @Test(timeout=10000)
+    public void oneListenerRejected()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+        // create a latch for the event listener, and add it to the list of events
+
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 0, 0 );
+
+        final int sizeToReturn = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+        final EntityVersionCreatedTest eventListener = new EntityVersionCreatedTest(latch);
+
+        final Set<EntityVersionCreated> listeners = mock( Set.class );
+        final Iterator<EntityVersionCreated> helper = mock(Iterator.class);
+
+        when ( listeners.size()).thenReturn( 1 );
+        when ( listeners.iterator()).thenReturn( helper );
+        when ( helper.next() ).thenReturn( eventListener );
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl(
+                applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+        final Entity entity = new Entity( entityId );
+
+        // start the task
+
+        EntityVersionCreatedTask entityVersionCreatedTask =
+                new EntityVersionCreatedTask( appScope, listeners, entity);
+
+        ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
+
+        // wait for the task
+        future.get();
+
+        //mocked listener makes sure that the task is called
+        verify( listeners ).size();
+        verify( listeners ).iterator();
+        verify( helper ).next();
+
+    }
+
     private static class EntityVersionCreatedTest implements EntityVersionCreated {
         final CountDownLatch invocationLatch;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index b18687a..a022c08 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,16 +1,24 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
@@ -36,7 +44,6 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private final String name;
     private final int poolSize;
-    private final int queueLength;
 
 
     /**
@@ -47,17 +54,24 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
-        Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" );
         Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
         this.name = name;
         this.poolSize = poolSize;
-        this.queueLength = queueLength;
 
-        final BlockingQueue<Runnable> queue =
-                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+        //The user has chosen to disable asynchronous execution, to create an executor service that will reject all requests
+        if(poolSize == 0){
+            executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService());
+        }
+
+        //queue executions as normal
+        else {
+            final BlockingQueue<Runnable> queue =
+                    queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
 
-        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+            executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+        }
     }
 
 
@@ -164,4 +178,92 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         }
 
     }
+
+
+    /**
+     * Executor implementation that simply rejects all incoming tasks
+     */
+    private static final class RejectingExecutorService implements ExecutorService{
+
+        @Override
+        public void shutdown() {
+
+        }
+
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return Collections.EMPTY_LIST;
+        }
+
+
+        @Override
+        public boolean isShutdown() {
+            return false;
+        }
+
+
+        @Override
+        public boolean isTerminated() {
+            return false;
+        }
+
+
+        @Override
+        public boolean awaitTermination( final long timeout, final TimeUnit unit ) throws InterruptedException {
+            return false;
+        }
+
+
+        @Override
+        public <T> Future<T> submit( final Callable<T> task ) {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public <T> Future<T> submit( final Runnable task, final T result ) {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public Future<?> submit( final Runnable task ) {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks )
+                throws InterruptedException {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks, final long timeout,
+                                              final TimeUnit unit ) throws InterruptedException {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public <T> T invokeAny( final Collection<? extends Callable<T>> tasks )
+                throws InterruptedException, ExecutionException {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit )
+                throws InterruptedException, ExecutionException, TimeoutException {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+
+
+        @Override
+        public void execute( final Runnable command ) {
+            throw new RejectedExecutionException("No Asynchronous tasks allowed");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a49b4449/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 34f57e5..65189f1 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -185,6 +185,38 @@ public class NamedTaskExecutorImplTest {
     }
 
 
+    @Test
+    public void rejectingTaskExecutor() throws InterruptedException {
+
+        final int threadPoolSize = 0;
+        final int queueSize = 0;
+
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch runLatch = new CountDownLatch( 0 );
+
+
+        //now submit the second task
+
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        //should be immediately rejected
+        rejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 0l, exceptionLatch.getCount() );
+        assertEquals( 0l, runLatch.getCount() );
+    }
+
+
     private static abstract class TestTask<V> implements Task<V> {
 
         private final List<Throwable> exceptions;