You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/09/25 01:39:19 UTC

[1/2] git commit: Added task execution framework

Repository: incubator-usergrid
Updated Branches:
  refs/heads/eventsystem c667c18e3 -> 4c72f5c63


Added task execution framework


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/66e508a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/66e508a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/66e508a9

Branch: refs/heads/eventsystem
Commit: 66e508a942d33051c95bfcd80e19b87a40812370
Parents: c667c18
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 16:47:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 16:47:24 2014 -0600

----------------------------------------------------------------------
 .../core/task/NamedTaskExecutorImpl.java        | 162 +++++++++++++
 .../usergrid/persistence/core/task/Task.java    |  37 +++
 .../persistence/core/task/TaskExecutor.java     |  14 ++
 .../core/task/NamedTaskExecutorImplTest.java    | 233 +++++++++++++++++++
 4 files changed, 446 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
new file mode 100644
index 0000000..8ba73a0
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -0,0 +1,162 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+/**
+ * Implementation of the task executor with a unique name and size
+ */
+public class NamedTaskExecutorImpl implements TaskExecutor {
+
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
+
+    private final ListeningExecutorService executorService;
+
+
+    /**
+     * @param name The name of this instance of the task executor
+     * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
+     * @param queueLength The length of tasks to keep in the queue
+     */
+    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+        Preconditions.checkNotNull( name );
+        Preconditions.checkArgument( name.length() > 0, "name must have a length" );
+        Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
+
+
+        final BlockingQueue<Runnable> queue =
+                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+    }
+
+
+    @Override
+    public <V, I> void submit( final Task<V, I> task ) {
+
+        final ListenableFuture<V> future;
+
+        try {
+            future = executorService.submit( task );
+
+            /**
+             * Log our success or failures for debugging purposes
+             */
+            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+            task.rejected();
+            return;
+        }
+    }
+
+
+    /**
+     * Callback for when the task succeeds or fails.
+     */
+    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+        private final Task<V, I> task;
+
+
+        private TaskFutureCallBack( Task<V, I> task ) {
+            this.task = task;
+        }
+
+
+        @Override
+        public void onSuccess( @Nullable final V result ) {
+            LOG.debug( "Successfully completed task ", task );
+        }
+
+
+        @Override
+        public void onFailure( final Throwable t ) {
+            LOG.error( "Unable to execute task.  Exception is ", t );
+
+            task.exceptionThrown( t );
+        }
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+
+            super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+                    new RejectedHandler() );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private static final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
+        private final String name;
+
+
+        private CountingThreadFactory( final String name ) {this.name = name;}
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, name + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private static final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+
+            //            ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
+            //
+            //            future.
+            //            final Task<?, ?> task = ( Task<?, ?> ) r;
+            LOG.warn( "Audit queue full, rejecting audit task {}", r );
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+            //            LOG.warn( "Audit queue full, rejecting audit task {}", task );
+            //            task.rejected();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
new file mode 100644
index 0000000..8b1ed22
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -0,0 +1,37 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.Callable;
+
+
+
+/**
+ * The task to execute
+ */
+public interface Task<V, I> extends Callable<V> {
+
+    /**
+     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
+     *
+     * @return
+     */
+    I getId();
+
+    /**
+     * Invoked when this task throws an uncaught exception.
+     * @param throwable
+     */
+    void exceptionThrown(final Throwable throwable);
+
+    /**
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
+     * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+     *
+     */
+    void rejected();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
new file mode 100644
index 0000000..e60da83
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * An interface for execution of tasks
+ */
+public interface TaskExecutor {
+
+    /**
+     * Submit the task asynchronously
+     * @param task
+     */
+    public <V, I> void submit(Task<V, I> task);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
new file mode 100644
index 0000000..9da9263
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -0,0 +1,233 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+
+/**
+ * Tests for the namedtask execution impl
+ */
+public class NamedTaskExecutorImplTest {
+
+
+    @Test
+    public void jobSuccess() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        assertEquals( 0l, exceptionLatch.getCount() );
+
+        assertEquals( 0l, rejectedLatch.getCount() );
+    }
+
+
+    @Test
+    public void exceptionThrown() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        final RuntimeException re = new RuntimeException( "throwing exception" );
+
+        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+            @Override
+            public Void call() throws Exception {
+                super.call();
+                throw re;
+            }
+        };
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+        exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        assertSame( re, task.exceptions.get( 0 ) );
+
+
+        assertEquals( 0l, rejectedLatch.getCount() );
+    }
+
+
+    @Test
+    public void noCapacity() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+            @Override
+            public Void call() throws Exception {
+                super.call();
+
+                //park this thread so it takes up a task and the next is rejected
+                final Object mutex = new Object();
+
+                synchronized ( mutex ) {
+                    mutex.wait();
+                }
+
+                return null;
+            }
+        };
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 1l, secondRunLatch.getCount() );
+        assertEquals( 0l, secondExceptionLatch.getCount() );
+    }
+
+
+    @Test
+    public void noCapacityWithQueue() throws InterruptedException {
+
+        final int threadPoolSize = 1;
+        final int queueSize = 10;
+
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        int iterations = threadPoolSize + queueSize;
+
+        for(int i = 0; i < iterations; i ++) {
+
+            final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+                @Override
+                public Void call() throws Exception {
+                    super.call();
+
+                    //park this thread so it takes up a task and the next is rejected
+                    final Object mutex = new Object();
+
+                    synchronized ( mutex ) {
+                        mutex.wait();
+                    }
+
+                    return null;
+                }
+            };
+            executor.submit( task );
+        }
+
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 1l, secondRunLatch.getCount() );
+        assertEquals( 0l, secondExceptionLatch.getCount() );
+    }
+
+
+    private static abstract class TestTask<V> implements Task<V, UUID> {
+
+        private final List<Throwable> exceptions;
+        private final CountDownLatch exceptionLatch;
+        private final CountDownLatch rejectedLatch;
+        private final CountDownLatch runLatch;
+
+
+        private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+                          final CountDownLatch runLatch ) {
+            this.exceptionLatch = exceptionLatch;
+            this.rejectedLatch = rejectedLatch;
+            this.runLatch = runLatch;
+
+            this.exceptions = new ArrayList<>();
+        }
+
+
+        @Override
+        public UUID getId() {
+            return UUIDGenerator.newTimeUUID();
+        }
+
+
+        @Override
+        public void exceptionThrown( final Throwable throwable ) {
+            this.exceptions.add( throwable );
+            exceptionLatch.countDown();
+        }
+
+
+        @Override
+        public void rejected() {
+            rejectedLatch.countDown();
+        }
+
+
+        @Override
+        public V call() throws Exception {
+            runLatch.countDown();
+            return null;
+        }
+    }
+}


[2/2] git commit: Migrated graph over to new task executor

Posted by to...@apache.org.
Migrated graph over to new task executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4c72f5c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4c72f5c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4c72f5c6

Branch: refs/heads/eventsystem
Commit: 4c72f5c636bae9b16df7578302538c7b4d9600e7
Parents: 66e508a
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 17:39:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 17:39:07 2014 -0600

----------------------------------------------------------------------
 .../collection/event/EntityVersionRemoved.java  |  26 +++
 .../collection/guice/CollectionModule.java      |  17 +-
 .../serialization/SerializationFig.java         |  32 ++-
 .../core/astyanax/AstyanaxKeyspaceProvider.java |   2 +
 .../persistence/core/guice/CommonModule.java    |   2 +
 .../core/task/NamedTaskExecutorImpl.java        |  14 +-
 .../persistence/core/task/TaskExecutor.java     |   2 +-
 .../usergrid/persistence/graph/GraphFig.java    |   2 +
 .../persistence/graph/guice/GraphModule.java    |  15 ++
 .../shard/impl/ShardGroupCompactionImpl.java    | 202 ++++++++++++-------
 .../impl/shard/ShardGroupCompactionTest.java    |   5 +-
 11 files changed, 227 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
new file mode 100644
index 0000000..dca575d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+
+
+/**
+ *
+ * Invoked when an entity version is removed.  Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionRemoved {
+
+
+    /**
+     * The version specified was removed.
+     * @param scope
+     * @param entityId
+     * @param entityVersion
+     */
+    public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 84f69db..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
@@ -79,8 +81,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Inject
     @Write
-
-    public WriteStart write (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
         final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
 
         return writeStart;
@@ -90,13 +91,21 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Inject
     @WriteUpdate
-
-    public WriteStart writeUpdate (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+    public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
         final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
 
         return writeStart;
     }
 
+    @Inject
+    @Singleton
+    @Provides
+    public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+    }
+
+
+
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 81302a6..7e69a19 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -15,25 +15,45 @@ public interface SerializationFig extends GuicyFig {
 
     /**
      * Time to live timeout in seconds.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.stage.transient.timeout" )
-    @Default( "5" )
+    @Key("collection.stage.transient.timeout")
+    @Default("5")
     int getTimeout();
 
     /**
      * Number of history items to return for delete.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.delete.history.size" )
-    @Default( "100" )
+    @Key("collection.delete.history.size")
+    @Default("100")
     int getHistorySize();
 
     /**
      * Number of items to buffer.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.buffer.size" )
-    @Default( "10" )
+    @Key("collection.buffer.size")
+    @Default("10")
     int getBufferSize();
+
+
+    /**
+     * The size of threads to have in the task pool
+     */
+    @Key( "collection.task.pool.threadsize" )
+    @Default( "20" )
+    int getTaskPoolThreadSize();
+
+
+
+    /**
+     * The size of threads to have in the task pool
+     */
+    @Key( "collection.task.pool.queuesize" )
+    @Default( "20" )
+    int getTaskPoolQueueSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..8bd5a9f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.Singleton;
 import com.netflix.astyanax.AstyanaxConfiguration;
 import com.netflix.astyanax.AstyanaxContext;
 import com.netflix.astyanax.Keyspace;
@@ -41,6 +42,7 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
  *
  * @author tnine
  */
+@Singleton
 public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
     private final CassandraFig cassandraFig;
     private final CassandraConfig cassandraConfig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..5f461bb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -63,4 +63,6 @@ public class CommonModule extends AbstractModule {
 
 
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8ba73a0..8184937 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -53,7 +53,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> void submit( final Task<V, I> task ) {
+    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
 
         final ListenableFuture<V> future;
 
@@ -67,8 +67,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
-            return;
+            return Futures.immediateCancelledFuture();
         }
+
+        return future;
     }
 
 
@@ -147,16 +149,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-
-            //            ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
-            //
-            //            future.
-            //            final Task<?, ?> task = ( Task<?, ?> ) r;
             LOG.warn( "Audit queue full, rejecting audit task {}", r );
 
             throw new RejectedExecutionException( "Unable to run task, queue full" );
-            //            LOG.warn( "Audit queue full, rejecting audit task {}", task );
-            //            task.rejected();
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index e60da83..b5491bc 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -10,5 +10,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> void submit(Task<V, I> task);
+    public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 0a6ecfa..894e74a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -90,6 +90,8 @@ public interface GraphFig extends GuicyFig {
     public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
+
+
     @Default("1000")
     @Key(SCAN_PAGE_SIZE)
     int getScanPageSize();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
 import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -62,7 +64,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
 
@@ -144,6 +149,16 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
         migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
     }
+
+
+    @Inject
+    @Singleton
+    @Provides
+    public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
+    }
+
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 5076424..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -30,7 +30,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
@@ -46,6 +45,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -68,8 +69,6 @@ import com.google.common.hash.PrimitiveSink;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -91,7 +90,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
 
 
-    private final ListeningExecutorService executorService;
+    private final TaskExecutor taskExecutor;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final NodeShardAllocation nodeShardAllocation;
@@ -110,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                                      final NodeShardAllocation nodeShardAllocation,
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
-                                     final EdgeShardSerialization edgeShardSerialization ) {
+                                     final EdgeShardSerialization edgeShardSerialization,
+                                     final TaskExecutor taskExecutor ) {
 
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -124,8 +124,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
-        executorService = MoreExecutors.listeningDecorator(
-                new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
+        this.taskExecutor = taskExecutor;
     }
 
 
@@ -232,7 +231,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
 
         /**
-         * We didn't move anything this pass, mark the shard as compacted.  If we move something, it means that we missed it on the first pass
+         * We didn't move anything this pass, mark the shard as compacted.  If we move something,
+         * it means that we missed it on the first pass
          * or someone is still not writing to the target shard only.
          */
         if ( edgeCount == 0 ) {
@@ -293,91 +293,153 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
             @Override
-            public AuditResult call() throws Exception {
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
 
-                /**
-                 * We don't have a compaction pending.  Run an audit on the shards
-                 */
-                if ( !group.isCompactionPending() ) {
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
 
-                    /**
-                     * Check if we should allocate, we may want to
-                     */
+        return future;
+    }
 
-                    /**
-                     * It's already compacting, don't do anything
-                     */
-                    if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
-                        return AuditResult.CHECKED_NO_OP;
-                    }
 
-                    try {
+    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
 
-                        final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
-                        if ( !created ) {
-                            return AuditResult.CHECKED_NO_OP;
-                        }
-                    }
-                    finally {
-                        shardAuditTaskTracker.complete( scope, edgeMeta, group );
-                    }
+        private final ApplicationScope scope;
+        private final DirectedEdgeMeta edgeMeta;
+        private final ShardEntryGroup group;
 
 
-                    return AuditResult.CHECKED_CREATED;
-                }
+        public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
+            this.scope = scope;
+            this.edgeMeta = edgeMeta;
+            this.group = group;
+        }
+
+
+        @Override
+        public ShardAuditKey getId() {
+            return new ShardAuditKey( scope, edgeMeta, group );
+        }
+
+
+        @Override
+        public void exceptionThrown( final Throwable throwable ) {
+            LOG.error( "Unable to execute audit for shard of {}", throwable );
+        }
+
+
+        @Override
+        public void rejected() {
+            //ignore, if this happens we don't care, we're saturated, we can check later
+            LOG.error( "Rejected audit for shard of {}", getId() );
+        }
 
-                //check our taskmanager
 
+        @Override
+        public AuditResult call() throws Exception {
+            /**
+             * We don't have a compaction pending.  Run an audit on the shards
+             */
+            if ( !group.isCompactionPending() ) {
 
                 /**
-                 * Do the compaction
+                 * Check if we should allocate, we may want to
                  */
-                if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
-                    /**
-                     * It's already compacting, don't do anything
-                     */
-                    if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
-                        return AuditResult.COMPACTING;
-                    }
 
-                    /**
-                     * We use a finally b/c we always want to remove the task track
-                     */
-                    try {
-                        CompactionResult result = compact( scope, edgeMeta, group );
-                        LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
-                    }
-                    finally {
-                        shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                /**
+                 * It's already compacting, don't do anything
+                 */
+                if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                    return AuditResult.CHECKED_NO_OP;
+                }
+
+                try {
+
+                    final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+                    if ( !created ) {
+                        return AuditResult.CHECKED_NO_OP;
                     }
-                    return AuditResult.COMPACTED;
+                }
+                finally {
+                    shardAuditTaskTracker.complete( scope, edgeMeta, group );
                 }
 
-                //no op, there's nothing we need to do to this shard
-                return AuditResult.NOT_CHECKED;
-            }
-        } );
 
-        /**
-         * Log our success or failures for debugging purposes
-         */
-        Futures.addCallback( future, new FutureCallback<AuditResult>() {
-            @Override
-            public void onSuccess( @Nullable final AuditResult result ) {
-                LOG.debug( "Successfully completed audit of task {}", result );
+                return AuditResult.CHECKED_CREATED;
             }
 
+            //check our taskmanager
 
-            @Override
-            public void onFailure( final Throwable t ) {
-                LOG.error( "Unable to perform audit.  Exception is ", t );
+
+            /**
+             * Do the compaction
+             */
+            if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+                /**
+                 * It's already compacting, don't do anything
+                 */
+                if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                    return AuditResult.COMPACTING;
+                }
+
+                /**
+                 * We use a finally b/c we always want to remove the task track
+                 */
+                try {
+                    CompactionResult result = compact( scope, edgeMeta, group );
+                    LOG.info(
+                            "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+                                    "{} is {}",
+                            new Object[] { scope, edgeMeta, group, result } );
+                }
+                finally {
+                    shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                }
+                return AuditResult.COMPACTED;
             }
-        } );
 
-        return future;
+            //no op, there's nothing we need to do to this shard
+            return AuditResult.NOT_CHECKED;
+        }
+    }
+
+
+    private static final class ShardAuditKey {
+        private final ApplicationScope scope;
+        private final DirectedEdgeMeta edgeMeta;
+        private final ShardEntryGroup group;
+
+
+        private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
+            this.scope = scope;
+            this.edgeMeta = edgeMeta;
+            this.group = group;
+        }
+
+
+        @Override
+        public String toString() {
+            return "ShardAuditKey{" +
+                    "scope=" + scope +
+                    ", edgeMeta=" + edgeMeta +
+                    ", group=" + group +
+                    '}';
+        }
     }
 
 
@@ -531,7 +593,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-
     public static final class CompactionResult {
 
         public final long copiedEdges;
@@ -541,7 +602,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         public final Shard compactedShard;
 
 
-
         private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
                                   final Set<Shard> removedShards, final Shard compactedShard ) {
             this.copiedEdges = copiedEdges;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 1513e85..9f0792d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -35,6 +35,7 @@ import org.mockito.Matchers;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
@@ -84,6 +85,8 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
+        final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -100,7 +103,7 @@ public class ShardGroupCompactionTest {
 
         ShardGroupCompactionImpl compaction =
                 new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization );
+                        edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor );
 
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );