You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:03 UTC
[04/19] git commit: Converted tasks to ForkJoin tasks to allow for
parallel execution.
Converted tasks to ForkJoin tasks to allow for parallel execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dc3f448c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dc3f448c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dc3f448c
Branch: refs/heads/two-dot-o-rebuildable-index
Commit: dc3f448c946219c44e3dbfadb294fea0b8048343
Parents: c0b78b9
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 14:56:23 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 14:56:23 2014 -0600
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 2 +-
.../impl/EntityVersionCleanupTask.java | 8 +-
.../persistence/core/task/ImmediateTask.java | 42 +++++++
.../core/task/NamedTaskExecutorImpl.java | 74 ++++++++-----
.../usergrid/persistence/core/task/Task.java | 44 +++++---
.../persistence/core/task/TaskExecutor.java | 2 +-
.../core/task/NamedTaskExecutorImplTest.java | 111 +++++++++++++++----
.../persistence/graph/guice/GraphModule.java | 2 +-
.../impl/shard/ShardGroupCompaction.java | 7 +-
.../shard/impl/ShardGroupCompactionImpl.java | 44 ++------
10 files changed, 227 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 306f6e0..a6230e1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Provides
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 11e2da9..85afce6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -22,7 +22,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
* Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
* retained, the range is exclusive.
*/
-class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
@@ -63,7 +63,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
// so we'll run it in our current thread
try {
- call();
+ executeTask();
}
catch ( Exception e ) {
throw new RuntimeException( "Exception thrown in call task", e );
@@ -72,7 +72,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
@Override
- public CollectionIoEvent<EntityVersion> call() throws Exception {
+ public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
final CollectionScope scope = collectionIoEvent.getEntityCollection();
final Id entityId = collectionIoEvent.getEvent().getId();
@@ -102,8 +102,6 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
new file mode 100644
index 0000000..627e7e8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -0,0 +1,42 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * Does not perform computation, just returns the value passed to it
+ *
+ */
+public class ImmediateTask<V, I> extends Task<V, I> {
+
+ private final I id;
+ private final V returned;
+
+
+ protected ImmediateTask( final I id, final V returned ) {
+ this.id = id;
+ this.returned = returned;
+ }
+
+
+ @Override
+ public I getId() {
+ return id;
+ }
+
+
+ @Override
+ public V executeTask() throws Exception {
+ return returned;
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ //no op
+ }
+
+
+ @Override
+ public void rejected() {
+ //no op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 40ee5cf..aba8cd5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,11 +1,11 @@
package org.apache.usergrid.persistence.core.task;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -17,10 +17,6 @@ import org.slf4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
/**
@@ -30,54 +26,45 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
- private final ListeningExecutorService executorService;
+ private final NamedForkJoinPool executorService;
private final String name;
private final int poolSize;
- private final int queueLength;
/**
* @param name The name of this instance of the task executor
* @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
- * @param queueLength The length of tasks to keep in the queue
*/
- public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+ public NamedTaskExecutorImpl( final String name, final int poolSize) {
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
- Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
this.name = name;
this.poolSize = poolSize;
- this.queueLength = queueLength;
- final BlockingQueue<Runnable> queue =
- queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+// final BlockingQueue<Runnable> queue =
+// queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+//
+// executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+ this.executorService = new NamedForkJoinPool(poolSize);
}
@Override
- public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
-
- final ListenableFuture<V> future;
+ public <V, I> Task<V, I> submit( final Task<V, I> task ) {
try {
- future = executorService.submit( task );
-
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+ executorService.submit( task );
}
catch ( RejectedExecutionException ree ) {
task.rejected();
- return Futures.immediateCancelledFuture();
}
- return future;
+ return task;
+
}
@@ -109,6 +96,41 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
+ private final class NamedForkJoinPool extends ForkJoinPool{
+
+ private NamedForkJoinPool( final int workerThreadCount ) {
+ //TODO, verify the scheduler at the end
+ super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+ }
+
+
+
+ }
+
+ private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+ @Override
+ public void uncaughtException( final Thread t, final Throwable e ) {
+ LOG.error( "Uncaught exception on thread {} was {}", t, e );
+ }
+ }
+
+
+
+
+ private final class NamedWorkerThread extends ForkJoinWorkerThread{
+
+ /**
+ * Creates a ForkJoinWorkerThread operating in the given pool.
+ *
+ * @param pool the pool this thread works in
+ *
+ * @throws NullPointerException if pool is null
+ */
+ protected NamedWorkerThread(final String name, final ForkJoinPool pool ) {
+ super( pool );
+ }
+ }
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 518b461..eb04c2c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,37 +1,47 @@
package org.apache.usergrid.persistence.core.task;
-import java.util.concurrent.Callable;
-
+import java.util.concurrent.RecursiveTask;
/**
* The task to execute
*/
-public interface Task<V, I> extends Callable<V> {
+public abstract class Task<V, I> extends RecursiveTask<V> {
/**
* Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
- *
- * @return
*/
- I getId();
+ public abstract I getId();
+
+
+ @Override
+ protected V compute() {
+ try {
+ return executeTask();
+ }
+ catch ( Exception e ) {
+ exceptionThrown( e );
+ throw new RuntimeException( e );
+ }
+ }
+
/**
- * Invoked when this task throws an uncaught exception.
- * @param throwable
+ * Execute the task
*/
- void exceptionThrown(final Throwable throwable);
+ public abstract V executeTask() throws Exception;
/**
- * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
- * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
- * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
- * request and process later (lazy repair for instance ) do so.
- *
+ * Invoked when this task throws an uncaught exception.
*/
- void rejected();
-
-
+ public abstract void exceptionThrown( final Throwable throwable );
+ /**
+ * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
+ * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
+ * scheduling thread. Note that this has performance implications to the user. If you can drop the request and
+ * process later (lazy repair for instance ) do so.
+ */
+ public abstract void rejected();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 619ac14..a3553c7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> ListenableFuture<V> submit( Task<V, I> task );
+ public <V, I> Task<V, I > submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9da9263..f65d5a6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -11,8 +11,6 @@ import org.junit.Test;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import junit.framework.TestCase;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
@@ -25,7 +23,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void jobSuccess() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -46,7 +44,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void exceptionThrown() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -55,9 +53,10 @@ public class NamedTaskExecutorImplTest {
final RuntimeException re = new RuntimeException( "throwing exception" );
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+
+
@Override
- public Void call() throws Exception {
- super.call();
+ public Void executeTask() {
throw re;
}
};
@@ -77,7 +76,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void noCapacity() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -86,8 +85,8 @@ public class NamedTaskExecutorImplTest {
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
@Override
- public Void call() throws Exception {
- super.call();
+ public Void executeTask() throws Exception {
+ super.executeTask();
//park this thread so it takes up a task and the next is rejected
final Object mutex = new Object();
@@ -131,22 +130,22 @@ public class NamedTaskExecutorImplTest {
public void noCapacityWithQueue() throws InterruptedException {
final int threadPoolSize = 1;
- final int queueSize = 10;
+
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- int iterations = threadPoolSize + queueSize;
+ int iterations = threadPoolSize ;
- for(int i = 0; i < iterations; i ++) {
+ for ( int i = 0; i < iterations; i++ ) {
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
@Override
- public Void call() throws Exception {
- super.call();
+ public Void executeTask() throws Exception {
+ super.executeTask();
//park this thread so it takes up a task and the next is rejected
final Object mutex = new Object();
@@ -162,7 +161,6 @@ public class NamedTaskExecutorImplTest {
}
-
runLatch.await( 1000, TimeUnit.MILLISECONDS );
//now submit the second task
@@ -187,12 +185,81 @@ public class NamedTaskExecutorImplTest {
}
- private static abstract class TestTask<V> implements Task<V, UUID> {
+ @Test
+ public void jobTreeResult() throws InterruptedException {
+
+ final int threadPoolSize = 4;
+
+
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+
+ //accomodates for splitting the job 1->2->4 and joining
+ final CountDownLatch runLatch = new CountDownLatch( 7 );
+
+
+ TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+
+ executor.submit( task );
+
+
+ //compute our result
+ Integer result = task.join();
+
+ //result should be 1+2*2+3*4
+ final int expected = 4*3;
+
+ assertEquals(expected, result.intValue());
+
+ //just to check our latches
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //now submit the second task
+
+
+ }
+
+
+ private static class TestRecursiveTask extends TestTask<Integer> {
+
+ private final int depth;
+ private final int maxDepth;
+
+ private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+ final CountDownLatch runLatch, final int depth, final int maxDepth ) {
+ super( exceptionLatch, rejectedLatch, runLatch );
+ this.depth = depth;
+ this.maxDepth = maxDepth;
+ }
+
+
+ @Override
+ public Integer executeTask() throws Exception {
+
+ if(depth == maxDepth ){
+ return depth;
+ }
+
+ TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth );
+
+ TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth );
+
+ //run our left in another thread
+ left.fork();
+
+ return right.compute() + left.join();
+ }
+ }
+
+
+ private static abstract class TestTask<V> extends Task<V, UUID> {
- private final List<Throwable> exceptions;
- private final CountDownLatch exceptionLatch;
- private final CountDownLatch rejectedLatch;
- private final CountDownLatch runLatch;
+ protected final List<Throwable> exceptions;
+ protected final CountDownLatch exceptionLatch;
+ protected final CountDownLatch rejectedLatch;
+ protected final CountDownLatch runLatch;
private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -225,7 +292,7 @@ public class NamedTaskExecutorImplTest {
@Override
- public V call() throws Exception {
+ public V executeTask() throws Exception {
runLatch.countDown();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 608f8ce..a3978fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
@Singleton
@Provides
public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
- return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
+ return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..15acaa8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
import com.google.common.util.concurrent.ListenableFuture;
@@ -42,9 +44,8 @@ public interface ShardGroupCompaction {
*
* @return A ListenableFuture with the result. Note that some
*/
- public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
- final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group );
+ public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+ final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
public enum AuditResult {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..751b4d9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,14 +37,13 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.ImmediateTask;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -66,9 +65,6 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -277,45 +273,29 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
- final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group ) {
+ public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
final double repairChance = random.nextDouble();
//don't audit, we didn't hit our chance
if ( repairChance > graphFig.getShardRepairChance() ) {
- return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+ return new ImmediateTask<AuditResult, ShardAuditKey>( new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
}
/**
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
- ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+ return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new FutureCallback<AuditResult>() {
- @Override
- public void onSuccess( @Nullable final AuditResult result ) {
- LOG.debug( "Successfully completed audit of task {}", result );
- }
-
-
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to perform audit. Exception is ", t );
- }
- } );
- return future;
}
- private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
+ private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
@@ -350,7 +330,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public AuditResult call() throws Exception {
+ public AuditResult executeTask() throws Exception {
/**
* We don't have a compaction pending. Run an audit on the shards
*/
@@ -401,10 +381,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
*/
try {
CompactionResult result = compact( scope, edgeMeta, group );
- LOG.info(
- "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
- "{} is {}",
- new Object[] { scope, edgeMeta, group, result } );
+ LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
+ + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
}
finally {
shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -418,7 +396,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- private static final class ShardAuditKey {
+ public static final class ShardAuditKey {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
private final ShardEntryGroup group;