You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/09/25 01:39:19 UTC
[1/2] git commit: Added task execution framework
Repository: incubator-usergrid
Updated Branches:
refs/heads/eventsystem c667c18e3 -> 4c72f5c63
Added task execution framework
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/66e508a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/66e508a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/66e508a9
Branch: refs/heads/eventsystem
Commit: 66e508a942d33051c95bfcd80e19b87a40812370
Parents: c667c18
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 16:47:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 16:47:24 2014 -0600
----------------------------------------------------------------------
.../core/task/NamedTaskExecutorImpl.java | 162 +++++++++++++
.../usergrid/persistence/core/task/Task.java | 37 +++
.../persistence/core/task/TaskExecutor.java | 14 ++
.../core/task/NamedTaskExecutorImplTest.java | 233 +++++++++++++++++++
4 files changed, 446 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
new file mode 100644
index 0000000..8ba73a0
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -0,0 +1,162 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+/**
+ * Implementation of the task executor with a unique name and size
+ */
+public class NamedTaskExecutorImpl implements TaskExecutor {
+
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
+
+ private final ListeningExecutorService executorService;
+
+
+ /**
+ * @param name The name of this instance of the task executor
+ * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
+ * @param queueLength The length of tasks to keep in the queue
+ */
+ public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+ Preconditions.checkNotNull( name );
+ Preconditions.checkArgument( name.length() > 0, "name must have a length" );
+ Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+ Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
+
+
+ final BlockingQueue<Runnable> queue =
+ queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+
+ executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+ }
+
+
+ @Override
+ public <V, I> void submit( final Task<V, I> task ) {
+
+ final ListenableFuture<V> future;
+
+ try {
+ future = executorService.submit( task );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+ }
+ catch ( RejectedExecutionException ree ) {
+ task.rejected();
+ return;
+ }
+ }
+
+
+ /**
+ * Callback for when the task succeeds or fails.
+ */
+ private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+ private final Task<V, I> task;
+
+
+ private TaskFutureCallBack( Task<V, I> task ) {
+ this.task = task;
+ }
+
+
+ @Override
+ public void onSuccess( @Nullable final V result ) {
+ LOG.debug( "Successfully completed task ", task );
+ }
+
+
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to execute task. Exception is ", t );
+
+ task.exceptionThrown( t );
+ }
+ }
+
+
+ /**
+ * Create a thread pool that will reject work if our audit tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+
+ super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+ new RejectedHandler() );
+ }
+ }
+
+
+ /**
+ * Thread factory that will name and count threads for easier debugging
+ */
+ private static final class CountingThreadFactory implements ThreadFactory {
+
+ private final AtomicLong threadCounter = new AtomicLong();
+
+ private final String name;
+
+
+ private CountingThreadFactory( final String name ) {this.name = name;}
+
+
+ @Override
+ public Thread newThread( final Runnable r ) {
+ final long newValue = threadCounter.incrementAndGet();
+
+ Thread t = new Thread( r, name + "-" + newValue );
+
+ t.setDaemon( true );
+
+ return t;
+ }
+ }
+
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private static final class RejectedHandler implements RejectedExecutionHandler {
+
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+
+ // ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
+ //
+ // future.
+ // final Task<?, ?> task = ( Task<?, ?> ) r;
+ LOG.warn( "Audit queue full, rejecting audit task {}", r );
+
+ throw new RejectedExecutionException( "Unable to run task, queue full" );
+ // LOG.warn( "Audit queue full, rejecting audit task {}", task );
+ // task.rejected();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
new file mode 100644
index 0000000..8b1ed22
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -0,0 +1,37 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.Callable;
+
+
+
+/**
+ * The task to execute
+ */
+public interface Task<V, I> extends Callable<V> {
+
+ /**
+ * Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
+ *
+ * @return
+ */
+ I getId();
+
+ /**
+ * Invoked when this task throws an uncaught exception.
+ * @param throwable
+ */
+ void exceptionThrown(final Throwable throwable);
+
+ /**
+ * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+ * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+ * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
+ * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+ *
+ */
+ void rejected();
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
new file mode 100644
index 0000000..e60da83
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * An interface for execution of tasks
+ */
+public interface TaskExecutor {
+
+ /**
+ * Submit the task asynchronously
+ * @param task
+ */
+ public <V, I> void submit(Task<V, I> task);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
new file mode 100644
index 0000000..9da9263
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -0,0 +1,233 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+
+/**
+ * Tests for the namedtask execution impl
+ */
+public class NamedTaskExecutorImplTest {
+
+
+ @Test
+ public void jobSuccess() throws InterruptedException {
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+ final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( task );
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ assertEquals( 0l, exceptionLatch.getCount() );
+
+ assertEquals( 0l, rejectedLatch.getCount() );
+ }
+
+
+ @Test
+ public void exceptionThrown() throws InterruptedException {
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+ final RuntimeException re = new RuntimeException( "throwing exception" );
+
+ final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+ @Override
+ public Void call() throws Exception {
+ super.call();
+ throw re;
+ }
+ };
+
+ executor.submit( task );
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+ exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ assertSame( re, task.exceptions.get( 0 ) );
+
+
+ assertEquals( 0l, rejectedLatch.getCount() );
+ }
+
+
+ @Test
+ public void noCapacity() throws InterruptedException {
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+
+ final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+ @Override
+ public Void call() throws Exception {
+ super.call();
+
+ //park this thread so it takes up a task and the next is rejected
+ final Object mutex = new Object();
+
+ synchronized ( mutex ) {
+ mutex.wait();
+ }
+
+ return null;
+ }
+ };
+
+ executor.submit( task );
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //now submit the second task
+
+
+ final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+ final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+ final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( testTask );
+
+
+ secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //if we get here we've been rejected, just double check we didn't run
+
+ assertEquals( 1l, secondRunLatch.getCount() );
+ assertEquals( 0l, secondExceptionLatch.getCount() );
+ }
+
+
+ @Test
+ public void noCapacityWithQueue() throws InterruptedException {
+
+ final int threadPoolSize = 1;
+ final int queueSize = 10;
+
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+ int iterations = threadPoolSize + queueSize;
+
+ for(int i = 0; i < iterations; i ++) {
+
+ final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+ @Override
+ public Void call() throws Exception {
+ super.call();
+
+ //park this thread so it takes up a task and the next is rejected
+ final Object mutex = new Object();
+
+ synchronized ( mutex ) {
+ mutex.wait();
+ }
+
+ return null;
+ }
+ };
+ executor.submit( task );
+ }
+
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //now submit the second task
+
+
+ final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+ final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+ final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( testTask );
+
+
+ secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //if we get here we've been rejected, just double check we didn't run
+
+ assertEquals( 1l, secondRunLatch.getCount() );
+ assertEquals( 0l, secondExceptionLatch.getCount() );
+ }
+
+
+ private static abstract class TestTask<V> implements Task<V, UUID> {
+
+ private final List<Throwable> exceptions;
+ private final CountDownLatch exceptionLatch;
+ private final CountDownLatch rejectedLatch;
+ private final CountDownLatch runLatch;
+
+
+ private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+ final CountDownLatch runLatch ) {
+ this.exceptionLatch = exceptionLatch;
+ this.rejectedLatch = rejectedLatch;
+ this.runLatch = runLatch;
+
+ this.exceptions = new ArrayList<>();
+ }
+
+
+ @Override
+ public UUID getId() {
+ return UUIDGenerator.newTimeUUID();
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ this.exceptions.add( throwable );
+ exceptionLatch.countDown();
+ }
+
+
+ @Override
+ public void rejected() {
+ rejectedLatch.countDown();
+ }
+
+
+ @Override
+ public V call() throws Exception {
+ runLatch.countDown();
+ return null;
+ }
+ }
+}
[2/2] git commit: Migrated graph over to new task executor
Posted by to...@apache.org.
Migrated graph over to new task executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4c72f5c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4c72f5c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4c72f5c6
Branch: refs/heads/eventsystem
Commit: 4c72f5c636bae9b16df7578302538c7b4d9600e7
Parents: 66e508a
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 17:39:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 17:39:07 2014 -0600
----------------------------------------------------------------------
.../collection/event/EntityVersionRemoved.java | 26 +++
.../collection/guice/CollectionModule.java | 17 +-
.../serialization/SerializationFig.java | 32 ++-
.../core/astyanax/AstyanaxKeyspaceProvider.java | 2 +
.../persistence/core/guice/CommonModule.java | 2 +
.../core/task/NamedTaskExecutorImpl.java | 14 +-
.../persistence/core/task/TaskExecutor.java | 2 +-
.../usergrid/persistence/graph/GraphFig.java | 2 +
.../persistence/graph/guice/GraphModule.java | 15 ++
.../shard/impl/ShardGroupCompactionImpl.java | 202 ++++++++++++-------
.../impl/shard/ShardGroupCompactionTest.java | 5 +-
11 files changed, 227 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
new file mode 100644
index 0000000..dca575d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+
+
+/**
+ *
+ * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionRemoved {
+
+
+ /**
+ * The version specified was removed.
+ * @param scope
+ * @param entityId
+ * @param entityVersion
+ */
+ public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 84f69db..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
@@ -79,8 +81,7 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Inject
@Write
-
- public WriteStart write (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+ public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
return writeStart;
@@ -90,13 +91,21 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Inject
@WriteUpdate
-
- public WriteStart writeUpdate (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+ public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
return writeStart;
}
+ @Inject
+ @Singleton
+ @Provides
+ public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
+ return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 81302a6..7e69a19 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -15,25 +15,45 @@ public interface SerializationFig extends GuicyFig {
/**
* Time to live timeout in seconds.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.stage.transient.timeout" )
- @Default( "5" )
+ @Key("collection.stage.transient.timeout")
+ @Default("5")
int getTimeout();
/**
* Number of history items to return for delete.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.delete.history.size" )
- @Default( "100" )
+ @Key("collection.delete.history.size")
+ @Default("100")
int getHistorySize();
/**
* Number of items to buffer.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.buffer.size" )
- @Default( "10" )
+ @Key("collection.buffer.size")
+ @Default("10")
int getBufferSize();
+
+
+ /**
+ * The size of threads to have in the task pool
+ */
+ @Key( "collection.task.pool.threadsize" )
+ @Default( "20" )
+ int getTaskPoolThreadSize();
+
+
+
+ /**
+ * The size of threads to have in the task pool
+ */
+ @Key( "collection.task.pool.queuesize" )
+ @Default( "20" )
+ int getTaskPoolQueueSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..8bd5a9f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
import com.google.inject.Inject;
import com.google.inject.Provider;
+import com.google.inject.Singleton;
import com.netflix.astyanax.AstyanaxConfiguration;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
@@ -41,6 +42,7 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
*
* @author tnine
*/
+@Singleton
public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
private final CassandraFig cassandraFig;
private final CassandraConfig cassandraConfig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..5f461bb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -63,4 +63,6 @@ public class CommonModule extends AbstractModule {
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8ba73a0..8184937 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -53,7 +53,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
- public <V, I> void submit( final Task<V, I> task ) {
+ public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
final ListenableFuture<V> future;
@@ -67,8 +67,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
catch ( RejectedExecutionException ree ) {
task.rejected();
- return;
+ return Futures.immediateCancelledFuture();
}
+
+ return future;
}
@@ -147,16 +149,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-
- // ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
- //
- // future.
- // final Task<?, ?> task = ( Task<?, ?> ) r;
LOG.warn( "Audit queue full, rejecting audit task {}", r );
throw new RejectedExecutionException( "Unable to run task, queue full" );
- // LOG.warn( "Audit queue full, rejecting audit task {}", task );
- // task.rejected();
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index e60da83..b5491bc 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -10,5 +10,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> void submit(Task<V, I> task);
+ public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 0a6ecfa..894e74a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -90,6 +90,8 @@ public interface GraphFig extends GuicyFig {
public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+
+
@Default("1000")
@Key(SCAN_PAGE_SIZE)
int getScanPageSize();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -62,7 +64,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
@@ -144,6 +149,16 @@ public class GraphModule extends AbstractModule {
migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
}
+
+
+ @Inject
+ @Singleton
+ @Provides
+ public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
+ return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 5076424..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -30,7 +30,6 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
@@ -46,6 +45,8 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -68,8 +69,6 @@ import com.google.common.hash.PrimitiveSink;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -91,7 +90,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
- private final ListeningExecutorService executorService;
+ private final TaskExecutor taskExecutor;
private final TimeService timeService;
private final GraphFig graphFig;
private final NodeShardAllocation nodeShardAllocation;
@@ -110,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final NodeShardAllocation nodeShardAllocation,
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
- final EdgeShardSerialization edgeShardSerialization ) {
+ final EdgeShardSerialization edgeShardSerialization,
+ final TaskExecutor taskExecutor ) {
this.timeService = timeService;
this.graphFig = graphFig;
@@ -124,8 +124,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
this.shardAuditTaskTracker = new ShardAuditTaskTracker();
- executorService = MoreExecutors.listeningDecorator(
- new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
+ this.taskExecutor = taskExecutor;
}
@@ -232,7 +231,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
- * We didn't move anything this pass, mark the shard as compacted. If we move something, it means that we missed it on the first pass
+ * We didn't move anything this pass, mark the shard as compacted. If we move something,
+ * it means that we missed it on the first pass
* or someone is still not writing to the target shard only.
*/
if ( edgeCount == 0 ) {
@@ -293,91 +293,153 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
- ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+ ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new FutureCallback<AuditResult>() {
@Override
- public AuditResult call() throws Exception {
+ public void onSuccess( @Nullable final AuditResult result ) {
+ LOG.debug( "Successfully completed audit of task {}", result );
+ }
- /**
- * We don't have a compaction pending. Run an audit on the shards
- */
- if ( !group.isCompactionPending() ) {
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to perform audit. Exception is ", t );
+ }
+ } );
- /**
- * Check if we should allocate, we may want to
- */
+ return future;
+ }
- /**
- * It's already compacting, don't do anything
- */
- if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- return AuditResult.CHECKED_NO_OP;
- }
- try {
+ private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
- final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
- if ( !created ) {
- return AuditResult.CHECKED_NO_OP;
- }
- }
- finally {
- shardAuditTaskTracker.complete( scope, edgeMeta, group );
- }
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta edgeMeta;
+ private final ShardEntryGroup group;
- return AuditResult.CHECKED_CREATED;
- }
+ public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+ this.scope = scope;
+ this.edgeMeta = edgeMeta;
+ this.group = group;
+ }
+
+
+ @Override
+ public ShardAuditKey getId() {
+ return new ShardAuditKey( scope, edgeMeta, group );
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ LOG.error( "Unable to execute audit for shard of {}", throwable );
+ }
+
+
+ @Override
+ public void rejected() {
+ //ignore, if this happens we don't care, we're saturated, we can check later
+ LOG.error( "Rejected audit for shard of {}", getId() );
+ }
- //check our taskmanager
+ @Override
+ public AuditResult call() throws Exception {
+ /**
+ * We don't have a compaction pending. Run an audit on the shards
+ */
+ if ( !group.isCompactionPending() ) {
/**
- * Do the compaction
+ * Check if we should allocate, we may want to
*/
- if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
- /**
- * It's already compacting, don't do anything
- */
- if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- return AuditResult.COMPACTING;
- }
- /**
- * We use a finally b/c we always want to remove the task track
- */
- try {
- CompactionResult result = compact( scope, edgeMeta, group );
- LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
- }
- finally {
- shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.CHECKED_NO_OP;
+ }
+
+ try {
+
+ final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+ if ( !created ) {
+ return AuditResult.CHECKED_NO_OP;
}
- return AuditResult.COMPACTED;
+ }
+ finally {
+ shardAuditTaskTracker.complete( scope, edgeMeta, group );
}
- //no op, there's nothing we need to do to this shard
- return AuditResult.NOT_CHECKED;
- }
- } );
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new FutureCallback<AuditResult>() {
- @Override
- public void onSuccess( @Nullable final AuditResult result ) {
- LOG.debug( "Successfully completed audit of task {}", result );
+ return AuditResult.CHECKED_CREATED;
}
+ //check our taskmanager
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to perform audit. Exception is ", t );
+
+ /**
+ * Do the compaction
+ */
+ if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.COMPACTING;
+ }
+
+ /**
+ * We use a finally b/c we always want to remove the task track
+ */
+ try {
+ CompactionResult result = compact( scope, edgeMeta, group );
+ LOG.info(
+ "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+ "{} is {}",
+ new Object[] { scope, edgeMeta, group, result } );
+ }
+ finally {
+ shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ }
+ return AuditResult.COMPACTED;
}
- } );
- return future;
+ //no op, there's nothing we need to do to this shard
+ return AuditResult.NOT_CHECKED;
+ }
+ }
+
+
+ private static final class ShardAuditKey {
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta edgeMeta;
+ private final ShardEntryGroup group;
+
+
+ private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+ this.scope = scope;
+ this.edgeMeta = edgeMeta;
+ this.group = group;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ShardAuditKey{" +
+ "scope=" + scope +
+ ", edgeMeta=" + edgeMeta +
+ ", group=" + group +
+ '}';
+ }
}
@@ -531,7 +593,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
-
public static final class CompactionResult {
public final long copiedEdges;
@@ -541,7 +602,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
public final Shard compactedShard;
-
private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
final Set<Shard> removedShards, final Shard compactedShard ) {
this.copiedEdges = copiedEdges;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 1513e85..9f0792d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -35,6 +35,7 @@ import org.mockito.Matchers;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
@@ -84,6 +85,8 @@ public class ShardGroupCompactionTest {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+ final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
final long delta = 10000;
final long createTime = 20000;
@@ -100,7 +103,7 @@ public class ShardGroupCompactionTest {
ShardGroupCompactionImpl compaction =
new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
- edgeColumnFamilies, keyspace, edgeShardSerialization );
+ edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor );
DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );