You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:06 UTC

[07/19] git commit: Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."

Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."

This reverts commit dc3f448c946219c44e3dbfadb294fea0b8048343.

The ForkJoinPool does not have the following characteristics

Ability to name a pool
Ability to set queue size for rejection

As a result, it is not well suited for our asynchronous I/O tasks, and I'm moving back to ExecutorService

Conflicts:
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
	stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af3726b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af3726b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af3726b8

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: af3726b894051fde9ce63c8db1db0996aa4f2992
Parents: d4f80e7
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 17:58:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:58:24 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   2 +-
 .../persistence/core/task/ImmediateTask.java    |  36 ------
 .../core/task/NamedTaskExecutorImpl.java        | 126 +++++++++++++------
 .../usergrid/persistence/core/task/Task.java    |  53 +++-----
 .../persistence/core/task/TaskExecutor.java     |   3 +-
 .../core/task/NamedTaskExecutorImplTest.java    | 116 ++++-------------
 .../persistence/graph/guice/GraphModule.java    |   2 +-
 .../impl/shard/ShardGroupCompaction.java        |   7 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  51 ++++++--
 9 files changed, 180 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index a6230e1..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
deleted file mode 100644
index f6c28a3..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.usergrid.persistence.core.task;
-
-
-/**
- * Does not perform computation, just returns the value passed to it
- *
- */
-public class ImmediateTask<V> extends Task<V> {
-
-
-    private final V returned;
-
-
-    protected ImmediateTask( final V returned ) {
-        this.returned = returned;
-    }
-
-
-
-    @Override
-    public V executeTask() throws Exception {
-        return returned;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-          //no op
-    }
-
-
-    @Override
-    public void rejected() {
-        //no op
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index bb9cac8..d4cc915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.usergrid.persistence.core.task;
 
 
@@ -24,6 +6,7 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +18,10 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 
 /**
@@ -44,64 +31,131 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
 
-    private final NamedForkJoinPool executorService;
+    private final ListeningExecutorService executorService;
 
     private final String name;
     private final int poolSize;
+    private final int queueLength;
 
 
     /**
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
+     * @param queueLength The length of tasks to keep in the queue
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize ) {
-
-        //TODO, figure out how to name the fork/join threads in the pool
+    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
         this.name = name;
         this.poolSize = poolSize;
+        this.queueLength = queueLength;
+
+        final BlockingQueue<Runnable> queue =
+                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
 
-        this.executorService = new NamedForkJoinPool( poolSize );
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
     }
 
 
     @Override
-    public <V> Task<V> submit( final Task<V> task ) {
+    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+
+        final ListenableFuture<V> future;
 
         try {
-            executorService.submit( task );
+            future = executorService.submit( task );
+
+            /**
+             * Log our success or failures for debugging purposes
+             */
+            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
+            return Futures.immediateCancelledFuture();
         }
 
-        return task;
+        return future;
     }
 
 
-    @Override
-    public void shutdown() {
-        executorService.shutdownNow();
+    /**
+     * Callback for when the task succeeds or fails.
+     */
+    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+        private final Task<V, I> task;
+
+
+        private TaskFutureCallBack( Task<V, I> task ) {
+            this.task = task;
+        }
+
+
+        @Override
+        public void onSuccess( @Nullable final V result ) {
+            LOG.debug( "Successfully completed task ", task );
+        }
+
+
+        @Override
+        public void onFailure( final Throwable t ) {
+            LOG.error( "Unable to execute task.  Exception is ", t );
+
+            task.exceptionThrown( t );
+        }
     }
 
 
-    private final class NamedForkJoinPool extends ForkJoinPool {
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
 
-        private NamedForkJoinPool( final int workerThreadCount ) {
-            //TODO, verify the scheduler at the end
-            super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
+                    new RejectedHandler() );
         }
     }
 
 
-    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
 
         @Override
-        public void uncaughtException( final Thread t, final Throwable e ) {
-            LOG.error( "Uncaught exception on thread {} was {}", t, e );
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, name + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
         }
     }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            LOG.warn( "{} task queue full, rejecting task {}", name, r );
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 5d35ce4..9a0b857 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,35 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public abstract class Task<V> extends RecursiveTask<V> {
+public interface Task<V, I> extends Callable<V> {
+
+    /**
+     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
+     */
+    public abstract I getId();
+
 
     @Override
     protected V compute() {
@@ -43,22 +27,21 @@ public abstract class Task<V> extends RecursiveTask<V> {
     }
 
 
-
-    /**
-     * Execute the task
-     */
-    public abstract V executeTask() throws Exception;
-
     /**
      * Invoked when this task throws an uncaught exception.
+     * @param throwable
      */
-    public abstract void exceptionThrown( final Throwable throwable );
+    void exceptionThrown(final Throwable throwable);
 
     /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
-     * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
-     * scheduling thread.  Note that this has performance implications to the user.  If you can drop the request and
-     * process later (lazy repair for instance ) do so.
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
+     * request and process later (lazy repair for instance ) do so.
+     *
      */
-    public abstract void rejected();
+    void rejected();
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index c6f5915..aaa9d60 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,10 +13,11 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V> Task<V > submit( Task<V> task );
+    public <V> ListenableFuture<V> submit( Task<V> task );
 
     /**
      * Stop the task executor without waiting for scheduled threads to run
      */
     public void shutdown();
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9656b5f..fa63eee 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -23,13 +23,13 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
 
         executor.submit( task );
 
@@ -44,7 +44,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -53,10 +53,9 @@ public class NamedTaskExecutorImplTest {
         final RuntimeException re = new RuntimeException( "throwing exception" );
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-
-
             @Override
-            public Void executeTask() {
+            public Void call() throws Exception {
+                super.call();
                 throw re;
             }
         };
@@ -76,7 +75,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -85,8 +84,8 @@ public class NamedTaskExecutorImplTest {
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
             @Override
-            public Void executeTask() throws Exception {
-                super.executeTask();
+            public Void call() throws Exception {
+                super.call();
 
                 //park this thread so it takes up a task and the next is rejected
                 final Object mutex = new Object();
@@ -130,22 +129,22 @@ public class NamedTaskExecutorImplTest {
     public void noCapacityWithQueue() throws InterruptedException {
 
         final int threadPoolSize = 1;
-       
+        final int queueSize = 10;
 
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        int iterations = threadPoolSize ;
+        int iterations = threadPoolSize + queueSize;
 
-        for ( int i = 0; i < iterations; i++ ) {
+        for(int i = 0; i < iterations; i ++) {
 
             final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
                 @Override
-                public Void executeTask() throws Exception {
-                    super.executeTask();
+                public Void call() throws Exception {
+                    super.call();
 
                     //park this thread so it takes up a task and the next is rejected
                     final Object mutex = new Object();
@@ -161,6 +160,7 @@ public class NamedTaskExecutorImplTest {
         }
 
 
+
         runLatch.await( 1000, TimeUnit.MILLISECONDS );
 
         //now submit the second task
@@ -185,81 +185,12 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    @Test
-    public void jobTreeResult() throws InterruptedException {
-
-        final int threadPoolSize = 4;
-       
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-
-        //accomodates for splitting the job 1->2->4 and joining
-        final CountDownLatch runLatch = new CountDownLatch( 7 );
-
-
-        TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+    private static abstract class TestTask<V> implements Task<V, UUID> {
 
-         executor.submit( task );
-
-
-        //compute our result
-        Integer result = task.join();
-
-        //result should be 1+2*2+3*4
-        final int expected = 4*3;
-
-        assertEquals(expected, result.intValue());
-
-        //just to check our latches
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-    }
-
-
-    private static class TestRecursiveTask extends TestTask<Integer> {
-
-        private final int depth;
-        private final int maxDepth;
-
-        private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
-                                   final CountDownLatch runLatch, final int depth, final int maxDepth ) {
-            super( exceptionLatch, rejectedLatch, runLatch );
-            this.depth = depth;
-            this.maxDepth = maxDepth;
-        }
-
-
-        @Override
-        public Integer executeTask() throws Exception {
-
-            if(depth == maxDepth ){
-                return depth;
-            }
-
-            TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
-
-            TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
-
-            //run our left in another thread
-            left.fork();
-
-            return right.compute() + left.join();
-        }
-    }
-
-
-    private static abstract class TestTask<V> extends Task<V> {
-
-        protected final List<Throwable> exceptions;
-        protected final CountDownLatch exceptionLatch;
-        protected final CountDownLatch rejectedLatch;
-        protected final CountDownLatch runLatch;
+        private final List<Throwable> exceptions;
+        private final CountDownLatch exceptionLatch;
+        private final CountDownLatch rejectedLatch;
+        private final CountDownLatch runLatch;
 
 
         private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -272,6 +203,11 @@ public class NamedTaskExecutorImplTest {
         }
 
 
+        @Override
+        public UUID getId() {
+            return UUIDGenerator.newTimeUUID();
+        }
+
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -287,7 +223,7 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public V executeTask() throws Exception {
+        public V call() throws Exception {
             runLatch.countDown();
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index a3978fc..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
-        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount()  );
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index ed63587..4fe1a63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,8 +27,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -44,8 +42,9 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public Task<AuditResult> evaluateShardGroup(
-            final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group );
 
 
     public enum AuditResult {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index a84a0f0..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,13 +37,14 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.ImmediateTask;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -65,6 +66,9 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -273,29 +277,45 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
     @Override
-    public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                                final DirectedEdgeMeta edgeMeta,
-                                                                final ShardEntryGroup group ) {
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group ) {
 
         final double repairChance = random.nextDouble();
 
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return new ImmediateTask<AuditResult>(  AuditResult.NOT_CHECKED ) {};
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
         }
 
         /**
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
          * hose the system
          */
-       return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
+            @Override
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
 
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
+
+        return future;
     }
 
 
-    private final class ShardAuditTask extends Task<AuditResult> {
+    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -310,6 +330,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         }
 
 
+        @Override
+        public ShardAuditKey getId() {
+            return new ShardAuditKey( scope, edgeMeta, group );
+        }
+
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -320,12 +345,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         @Override
         public void rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
+            LOG.error( "Rejected audit for shard of {}", getId() );
         }
 
 
         @Override
-        public AuditResult executeTask() throws Exception {
+        public AuditResult call() throws Exception {
             /**
              * We don't have a compaction pending.  Run an audit on the shards
              */
@@ -376,8 +401,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group );
-                    LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
-                                    + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
+                    LOG.info(
+                            "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+                                    "{} is {}",
+                            new Object[] { scope, edgeMeta, group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -391,7 +418,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    public static final class ShardAuditKey {
+    private static final class ShardAuditKey {
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
         private final ShardEntryGroup group;