You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:03 UTC

[04/19] git commit: Converted tasks to ForkJoin tasks to allow for parallel execution.

Converted tasks to ForkJoin tasks to allow for parallel execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dc3f448c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dc3f448c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dc3f448c

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: dc3f448c946219c44e3dbfadb294fea0b8048343
Parents: c0b78b9
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 14:56:23 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 14:56:23 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   2 +-
 .../impl/EntityVersionCleanupTask.java          |   8 +-
 .../persistence/core/task/ImmediateTask.java    |  42 +++++++
 .../core/task/NamedTaskExecutorImpl.java        |  74 ++++++++-----
 .../usergrid/persistence/core/task/Task.java    |  44 +++++---
 .../persistence/core/task/TaskExecutor.java     |   2 +-
 .../core/task/NamedTaskExecutorImplTest.java    | 111 +++++++++++++++----
 .../persistence/graph/guice/GraphModule.java    |   2 +-
 .../impl/shard/ShardGroupCompaction.java        |   7 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  44 ++------
 10 files changed, 227 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 306f6e0..a6230e1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 11e2da9..85afce6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -22,7 +22,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
  * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
  * retained, the range is exclusive.
  */
-class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
 
@@ -63,7 +63,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
         // so we'll run it in our current thread
 
         try {
-            call();
+            executeTask();
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Exception thrown in call task", e );
@@ -72,7 +72,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
 
 
     @Override
-    public CollectionIoEvent<EntityVersion> call() throws Exception {
+    public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
 
         final CollectionScope scope = collectionIoEvent.getEntityCollection();
         final Id entityId = collectionIoEvent.getEvent().getId();
@@ -102,8 +102,6 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
             entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
 
             logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
-
-
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
new file mode 100644
index 0000000..627e7e8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -0,0 +1,42 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * Does not perform computation, just returns the value passed to it
+ *
+ */
+public class ImmediateTask<V, I> extends Task<V, I> {
+
+    private final I id;
+    private final V returned;
+
+
+    protected ImmediateTask( final I id, final V returned ) {
+        this.id = id;
+        this.returned = returned;
+    }
+
+
+    @Override
+    public I getId() {
+        return id;
+    }
+
+
+    @Override
+    public V executeTask() throws Exception {
+        return returned;
+    }
+
+
+    @Override
+    public void exceptionThrown( final Throwable throwable ) {
+          //no op
+    }
+
+
+    @Override
+    public void rejected() {
+        //no op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 40ee5cf..aba8cd5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,11 +1,11 @@
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -17,10 +17,6 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 
 
 /**
@@ -30,54 +26,45 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
 
-    private final ListeningExecutorService executorService;
+    private final NamedForkJoinPool executorService;
 
     private final String name;
     private final int poolSize;
-    private final int queueLength;
 
 
     /**
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
-     * @param queueLength The length of tasks to keep in the queue
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+    public NamedTaskExecutorImpl( final String name, final int poolSize) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
-        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
         this.name = name;
         this.poolSize = poolSize;
-        this.queueLength = queueLength;
 
-        final BlockingQueue<Runnable> queue =
-                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+//        final BlockingQueue<Runnable> queue =
+//                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+//
+//        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
 
-        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+       this.executorService =  new NamedForkJoinPool(poolSize);
     }
 
 
     @Override
-    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
-
-        final ListenableFuture<V> future;
+    public <V, I> Task<V, I> submit( final Task<V, I> task ) {
 
         try {
-            future = executorService.submit( task );
-
-            /**
-             * Log our success or failures for debugging purposes
-             */
-            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+           executorService.submit( task );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
-            return Futures.immediateCancelledFuture();
         }
 
-        return future;
+        return task;
+
     }
 
 
@@ -109,6 +96,41 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     }
 
 
+    private final class NamedForkJoinPool extends ForkJoinPool{
+
+        private NamedForkJoinPool( final int workerThreadCount ) {
+            //TODO, verify the scheduler at the end
+            super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+        }
+
+
+
+    }
+
+    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+        @Override
+        public void uncaughtException( final Thread t, final Throwable e ) {
+            LOG.error( "Uncaught exception on thread {} was {}", t, e );
+        }
+    }
+
+
+
+
+    private final class NamedWorkerThread extends ForkJoinWorkerThread{
+
+        /**
+         * Creates a ForkJoinWorkerThread operating in the given pool.
+         *
+         * @param pool the pool this thread works in
+         *
+         * @throws NullPointerException if pool is null
+         */
+        protected NamedWorkerThread(final String name,  final ForkJoinPool pool ) {
+            super( pool );
+        }
+    }
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 518b461..eb04c2c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,37 +1,47 @@
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.concurrent.Callable;
-
+import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public interface Task<V, I> extends Callable<V> {
+public abstract class Task<V, I> extends RecursiveTask<V> {
 
     /**
      * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
-     *
-     * @return
      */
-    I getId();
+    public abstract I getId();
+
+
+    @Override
+    protected V compute() {
+        try {
+            return executeTask();
+        }
+        catch ( Exception e ) {
+            exceptionThrown( e );
+            throw new RuntimeException( e );
+        }
+    }
+
 
     /**
-     * Invoked when this task throws an uncaught exception.
-     * @param throwable
+     * Execute the task
      */
-    void exceptionThrown(final Throwable throwable);
+    public abstract V executeTask() throws Exception;
 
     /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
-     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
-     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
-     * request and process later (lazy repair for instance ) do so.
-     *
+     * Invoked when this task throws an uncaught exception.
      */
-    void rejected();
-
-
+    public abstract void exceptionThrown( final Throwable throwable );
 
+    /**
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
+     * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
+     * scheduling thread.  Note that this has performance implications to the user.  If you can drop the request and
+     * process later (lazy repair for instance ) do so.
+     */
+    public abstract void rejected();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 619ac14..a3553c7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> ListenableFuture<V> submit( Task<V, I> task );
+    public <V, I> Task<V, I > submit( Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9da9263..f65d5a6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -11,8 +11,6 @@ import org.junit.Test;
 
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import junit.framework.TestCase;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
@@ -25,7 +23,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -46,7 +44,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -55,9 +53,10 @@ public class NamedTaskExecutorImplTest {
         final RuntimeException re = new RuntimeException( "throwing exception" );
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+
+
             @Override
-            public Void call() throws Exception {
-                super.call();
+            public Void executeTask() {
                 throw re;
             }
         };
@@ -77,7 +76,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -86,8 +85,8 @@ public class NamedTaskExecutorImplTest {
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
             @Override
-            public Void call() throws Exception {
-                super.call();
+            public Void executeTask() throws Exception {
+                super.executeTask();
 
                 //park this thread so it takes up a task and the next is rejected
                 final Object mutex = new Object();
@@ -131,22 +130,22 @@ public class NamedTaskExecutorImplTest {
     public void noCapacityWithQueue() throws InterruptedException {
 
         final int threadPoolSize = 1;
-        final int queueSize = 10;
+       
 
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        int iterations = threadPoolSize + queueSize;
+        int iterations = threadPoolSize ;
 
-        for(int i = 0; i < iterations; i ++) {
+        for ( int i = 0; i < iterations; i++ ) {
 
             final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
                 @Override
-                public Void call() throws Exception {
-                    super.call();
+                public Void executeTask() throws Exception {
+                    super.executeTask();
 
                     //park this thread so it takes up a task and the next is rejected
                     final Object mutex = new Object();
@@ -162,7 +161,6 @@ public class NamedTaskExecutorImplTest {
         }
 
 
-
         runLatch.await( 1000, TimeUnit.MILLISECONDS );
 
         //now submit the second task
@@ -187,12 +185,81 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    private static abstract class TestTask<V> implements Task<V, UUID> {
+    @Test
+    public void jobTreeResult() throws InterruptedException {
+
+        final int threadPoolSize = 4;
+       
+
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+
+        //accomodates for splitting the job 1->2->4 and joining
+        final CountDownLatch runLatch = new CountDownLatch( 7 );
+
+
+        TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+
+         executor.submit( task );
+
+
+        //compute our result
+        Integer result = task.join();
+
+        //result should be 1+2*2+3*4
+        final int expected = 4*3;
+
+        assertEquals(expected, result.intValue());
+
+        //just to check our latches
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+    }
+
+
+    private static class TestRecursiveTask extends TestTask<Integer> {
+
+        private final int depth;
+        private final int maxDepth;
+
+        private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+                                   final CountDownLatch runLatch, final int depth, final int maxDepth ) {
+            super( exceptionLatch, rejectedLatch, runLatch );
+            this.depth = depth;
+            this.maxDepth = maxDepth;
+        }
+
+
+        @Override
+        public Integer executeTask() throws Exception {
+
+            if(depth == maxDepth ){
+                return depth;
+            }
+
+            TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
+
+            TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
+
+            //run our left in another thread
+            left.fork();
+
+            return right.compute() + left.join();
+        }
+    }
+
+
+    private static abstract class TestTask<V> extends Task<V, UUID> {
 
-        private final List<Throwable> exceptions;
-        private final CountDownLatch exceptionLatch;
-        private final CountDownLatch rejectedLatch;
-        private final CountDownLatch runLatch;
+        protected final List<Throwable> exceptions;
+        protected final CountDownLatch exceptionLatch;
+        protected final CountDownLatch rejectedLatch;
+        protected final CountDownLatch runLatch;
 
 
         private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -225,7 +292,7 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public V call() throws Exception {
+        public V executeTask() throws Exception {
             runLatch.countDown();
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 608f8ce..a3978fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
-        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount()  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..15acaa8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -42,9 +44,8 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                             final DirectedEdgeMeta edgeMeta,
-                                                             final ShardEntryGroup group );
+    public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+            final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
 
 
     public enum AuditResult {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..751b4d9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,14 +37,13 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Nullable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.ImmediateTask;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -66,9 +65,6 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -277,45 +273,29 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
     @Override
-    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                             final DirectedEdgeMeta edgeMeta,
-                                                             final ShardEntryGroup group ) {
+    public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+                                                                final DirectedEdgeMeta edgeMeta,
+                                                                final ShardEntryGroup group ) {
 
         final double repairChance = random.nextDouble();
 
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+            return new ImmediateTask<AuditResult, ShardAuditKey>(  new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
         }
 
         /**
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+       return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
 
-        /**
-         * Log our success or failures for debugging purposes
-         */
-        Futures.addCallback( future, new FutureCallback<AuditResult>() {
-            @Override
-            public void onSuccess( @Nullable final AuditResult result ) {
-                LOG.debug( "Successfully completed audit of task {}", result );
-            }
-
-
-            @Override
-            public void onFailure( final Throwable t ) {
-                LOG.error( "Unable to perform audit.  Exception is ", t );
-            }
-        } );
 
-        return future;
     }
 
 
-    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
+    private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -350,7 +330,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
         @Override
-        public AuditResult call() throws Exception {
+        public AuditResult executeTask() throws Exception {
             /**
              * We don't have a compaction pending.  Run an audit on the shards
              */
@@ -401,10 +381,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group );
-                    LOG.info(
-                            "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
-                                    "{} is {}",
-                            new Object[] { scope, edgeMeta, group, result } );
+                    LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
+                                    + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -418,7 +396,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    private static final class ShardAuditKey {
+    public static final class ShardAuditKey {
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
         private final ShardEntryGroup group;