You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/01 18:32:05 UTC
[01/15] git commit: Added task execution framework
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o abbd76eb4 -> 2678eae9e
Added task execution framework
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/66e508a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/66e508a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/66e508a9
Branch: refs/heads/two-dot-o
Commit: 66e508a942d33051c95bfcd80e19b87a40812370
Parents: c667c18
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 16:47:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 16:47:24 2014 -0600
----------------------------------------------------------------------
.../core/task/NamedTaskExecutorImpl.java | 162 +++++++++++++
.../usergrid/persistence/core/task/Task.java | 37 +++
.../persistence/core/task/TaskExecutor.java | 14 ++
.../core/task/NamedTaskExecutorImplTest.java | 233 +++++++++++++++++++
4 files changed, 446 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
new file mode 100644
index 0000000..8ba73a0
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -0,0 +1,162 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+/**
+ * Implementation of the task executor with a unique name and size
+ */
+public class NamedTaskExecutorImpl implements TaskExecutor {
+
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
+
+ private final ListeningExecutorService executorService;
+
+
+ /**
+ * @param name The name of this instance of the task executor
+ * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
+ * @param queueLength The length of tasks to keep in the queue
+ */
+ public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+ Preconditions.checkNotNull( name );
+ Preconditions.checkArgument( name.length() > 0, "name must have a length" );
+ Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+ Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
+
+
+ final BlockingQueue<Runnable> queue =
+ queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+
+ executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+ }
+
+
+ @Override
+ public <V, I> void submit( final Task<V, I> task ) {
+
+ final ListenableFuture<V> future;
+
+ try {
+ future = executorService.submit( task );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+ }
+ catch ( RejectedExecutionException ree ) {
+ task.rejected();
+ return;
+ }
+ }
+
+
+ /**
+ * Callback for when the task succeeds or fails.
+ */
+ private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+ private final Task<V, I> task;
+
+
+ private TaskFutureCallBack( Task<V, I> task ) {
+ this.task = task;
+ }
+
+
+ @Override
+ public void onSuccess( @Nullable final V result ) {
+ LOG.debug( "Successfully completed task ", task );
+ }
+
+
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to execute task. Exception is ", t );
+
+ task.exceptionThrown( t );
+ }
+ }
+
+
+ /**
+ * Create a thread pool that will reject work if our audit tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+
+ super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+ new RejectedHandler() );
+ }
+ }
+
+
+ /**
+ * Thread factory that will name and count threads for easier debugging
+ */
+ private static final class CountingThreadFactory implements ThreadFactory {
+
+ private final AtomicLong threadCounter = new AtomicLong();
+
+ private final String name;
+
+
+ private CountingThreadFactory( final String name ) {this.name = name;}
+
+
+ @Override
+ public Thread newThread( final Runnable r ) {
+ final long newValue = threadCounter.incrementAndGet();
+
+ Thread t = new Thread( r, name + "-" + newValue );
+
+ t.setDaemon( true );
+
+ return t;
+ }
+ }
+
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private static final class RejectedHandler implements RejectedExecutionHandler {
+
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+
+ // ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
+ //
+ // future.
+ // final Task<?, ?> task = ( Task<?, ?> ) r;
+ LOG.warn( "Audit queue full, rejecting audit task {}", r );
+
+ throw new RejectedExecutionException( "Unable to run task, queue full" );
+ // LOG.warn( "Audit queue full, rejecting audit task {}", task );
+ // task.rejected();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
new file mode 100644
index 0000000..8b1ed22
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -0,0 +1,37 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.Callable;
+
+
+
+/**
+ * The task to execute
+ */
+public interface Task<V, I> extends Callable<V> {
+
+ /**
+ * Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
+ *
+ * @return
+ */
+ I getId();
+
+ /**
+ * Invoked when this task throws an uncaught exception.
+ * @param throwable
+ */
+ void exceptionThrown(final Throwable throwable);
+
+ /**
+ * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+ * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+ * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
+ * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+ *
+ */
+ void rejected();
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
new file mode 100644
index 0000000..e60da83
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * An interface for execution of tasks
+ */
+public interface TaskExecutor {
+
+ /**
+ * Submit the task asynchronously
+ * @param task
+ */
+ public <V, I> void submit(Task<V, I> task);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
new file mode 100644
index 0000000..9da9263
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -0,0 +1,233 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+
+/**
+ * Tests for the namedtask execution impl
+ */
+public class NamedTaskExecutorImplTest {
+
+
+ @Test
+ public void jobSuccess() throws InterruptedException {
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+ final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( task );
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ assertEquals( 0l, exceptionLatch.getCount() );
+
+ assertEquals( 0l, rejectedLatch.getCount() );
+ }
+
+
+ @Test
+ public void exceptionThrown() throws InterruptedException {
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+ final RuntimeException re = new RuntimeException( "throwing exception" );
+
+ final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+ @Override
+ public Void call() throws Exception {
+ super.call();
+ throw re;
+ }
+ };
+
+ executor.submit( task );
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+ exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ assertSame( re, task.exceptions.get( 0 ) );
+
+
+ assertEquals( 0l, rejectedLatch.getCount() );
+ }
+
+
+ @Test
+ public void noCapacity() throws InterruptedException {
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+
+ final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+ @Override
+ public Void call() throws Exception {
+ super.call();
+
+ //park this thread so it takes up a task and the next is rejected
+ final Object mutex = new Object();
+
+ synchronized ( mutex ) {
+ mutex.wait();
+ }
+
+ return null;
+ }
+ };
+
+ executor.submit( task );
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //now submit the second task
+
+
+ final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+ final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+ final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( testTask );
+
+
+ secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //if we get here we've been rejected, just double check we didn't run
+
+ assertEquals( 1l, secondRunLatch.getCount() );
+ assertEquals( 0l, secondExceptionLatch.getCount() );
+ }
+
+
+ @Test
+ public void noCapacityWithQueue() throws InterruptedException {
+
+ final int threadPoolSize = 1;
+ final int queueSize = 10;
+
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+ final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+ int iterations = threadPoolSize + queueSize;
+
+ for(int i = 0; i < iterations; i ++) {
+
+ final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+ @Override
+ public Void call() throws Exception {
+ super.call();
+
+ //park this thread so it takes up a task and the next is rejected
+ final Object mutex = new Object();
+
+ synchronized ( mutex ) {
+ mutex.wait();
+ }
+
+ return null;
+ }
+ };
+ executor.submit( task );
+ }
+
+
+
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //now submit the second task
+
+
+ final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+ final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+ final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+ executor.submit( testTask );
+
+
+ secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //if we get here we've been rejected, just double check we didn't run
+
+ assertEquals( 1l, secondRunLatch.getCount() );
+ assertEquals( 0l, secondExceptionLatch.getCount() );
+ }
+
+
+ private static abstract class TestTask<V> implements Task<V, UUID> {
+
+ private final List<Throwable> exceptions;
+ private final CountDownLatch exceptionLatch;
+ private final CountDownLatch rejectedLatch;
+ private final CountDownLatch runLatch;
+
+
+ private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+ final CountDownLatch runLatch ) {
+ this.exceptionLatch = exceptionLatch;
+ this.rejectedLatch = rejectedLatch;
+ this.runLatch = runLatch;
+
+ this.exceptions = new ArrayList<>();
+ }
+
+
+ @Override
+ public UUID getId() {
+ return UUIDGenerator.newTimeUUID();
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ this.exceptions.add( throwable );
+ exceptionLatch.countDown();
+ }
+
+
+ @Override
+ public void rejected() {
+ rejectedLatch.countDown();
+ }
+
+
+ @Override
+ public V call() throws Exception {
+ runLatch.countDown();
+ return null;
+ }
+ }
+}
[07/15] git commit: Revert "Converted tasks to ForkJoin tasks to
allow for parallel execution."
Posted by to...@apache.org.
Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."
This reverts commit dc3f448c946219c44e3dbfadb294fea0b8048343.
The ForkJoinPool does not have the following characteristics
Ability to name a pool
Ability to set queue size for rejection
As a result, it is not well suited for our asynchronous I/O tasks, and I'm moving back to ExecutorService
Conflicts:
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af3726b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af3726b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af3726b8
Branch: refs/heads/two-dot-o
Commit: af3726b894051fde9ce63c8db1db0996aa4f2992
Parents: d4f80e7
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 17:58:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:58:24 2014 -0600
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 2 +-
.../persistence/core/task/ImmediateTask.java | 36 ------
.../core/task/NamedTaskExecutorImpl.java | 126 +++++++++++++------
.../usergrid/persistence/core/task/Task.java | 53 +++-----
.../persistence/core/task/TaskExecutor.java | 3 +-
.../core/task/NamedTaskExecutorImplTest.java | 116 ++++-------------
.../persistence/graph/guice/GraphModule.java | 2 +-
.../impl/shard/ShardGroupCompaction.java | 7 +-
.../shard/impl/ShardGroupCompactionImpl.java | 51 ++++++--
9 files changed, 180 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index a6230e1..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Provides
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
+ return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
deleted file mode 100644
index f6c28a3..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.usergrid.persistence.core.task;
-
-
-/**
- * Does not perform computation, just returns the value passed to it
- *
- */
-public class ImmediateTask<V> extends Task<V> {
-
-
- private final V returned;
-
-
- protected ImmediateTask( final V returned ) {
- this.returned = returned;
- }
-
-
-
- @Override
- public V executeTask() throws Exception {
- return returned;
- }
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- //no op
- }
-
-
- @Override
- public void rejected() {
- //no op
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index bb9cac8..d4cc915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
package org.apache.usergrid.persistence.core.task;
@@ -24,6 +6,7 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -35,6 +18,10 @@ import org.slf4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
@@ -44,64 +31,131 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
- private final NamedForkJoinPool executorService;
+ private final ListeningExecutorService executorService;
private final String name;
private final int poolSize;
+ private final int queueLength;
/**
* @param name The name of this instance of the task executor
* @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
+ * @param queueLength The length of tasks to keep in the queue
*/
- public NamedTaskExecutorImpl( final String name, final int poolSize ) {
-
- //TODO, figure out how to name the fork/join threads in the pool
+ public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+ Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
this.name = name;
this.poolSize = poolSize;
+ this.queueLength = queueLength;
+
+ final BlockingQueue<Runnable> queue =
+ queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
- this.executorService = new NamedForkJoinPool( poolSize );
+ executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
}
@Override
- public <V> Task<V> submit( final Task<V> task ) {
+ public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+
+ final ListenableFuture<V> future;
try {
- executorService.submit( task );
+ future = executorService.submit( task );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
}
catch ( RejectedExecutionException ree ) {
task.rejected();
+ return Futures.immediateCancelledFuture();
}
- return task;
+ return future;
}
- @Override
- public void shutdown() {
- executorService.shutdownNow();
+ /**
+ * Callback for when the task succeeds or fails.
+ */
+ private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+ private final Task<V, I> task;
+
+
+ private TaskFutureCallBack( Task<V, I> task ) {
+ this.task = task;
+ }
+
+
+ @Override
+ public void onSuccess( @Nullable final V result ) {
+ LOG.debug( "Successfully completed task ", task );
+ }
+
+
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to execute task. Exception is ", t );
+
+ task.exceptionThrown( t );
+ }
}
- private final class NamedForkJoinPool extends ForkJoinPool {
+ /**
+ * Create a thread pool that will reject work if our audit tasks become overwhelmed
+ */
+ private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
- private NamedForkJoinPool( final int workerThreadCount ) {
- //TODO, verify the scheduler at the end
- super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+ super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
+ new RejectedHandler() );
}
}
- private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
+ /**
+ * Thread factory that will name and count threads for easier debugging
+ */
+ private final class CountingThreadFactory implements ThreadFactory {
+
+ private final AtomicLong threadCounter = new AtomicLong();
+
@Override
- public void uncaughtException( final Thread t, final Throwable e ) {
- LOG.error( "Uncaught exception on thread {} was {}", t, e );
+ public Thread newThread( final Runnable r ) {
+ final long newValue = threadCounter.incrementAndGet();
+
+ Thread t = new Thread( r, name + "-" + newValue );
+
+ t.setDaemon( true );
+
+ return t;
}
}
+
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private final class RejectedHandler implements RejectedExecutionHandler {
+
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ LOG.warn( "{} task queue full, rejecting task {}", name, r );
+
+ throw new RejectedExecutionException( "Unable to run task, queue full" );
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 5d35ce4..9a0b857 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,35 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
package org.apache.usergrid.persistence.core.task;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* The task to execute
*/
-public abstract class Task<V> extends RecursiveTask<V> {
+public interface Task<V, I> extends Callable<V> {
+
+ /**
+ * Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
+ */
+ public abstract I getId();
+
@Override
protected V compute() {
@@ -43,22 +27,21 @@ public abstract class Task<V> extends RecursiveTask<V> {
}
-
- /**
- * Execute the task
- */
- public abstract V executeTask() throws Exception;
-
/**
* Invoked when this task throws an uncaught exception.
+ * @param throwable
*/
- public abstract void exceptionThrown( final Throwable throwable );
+ void exceptionThrown(final Throwable throwable);
/**
- * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
- * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
- * scheduling thread. Note that this has performance implications to the user. If you can drop the request and
- * process later (lazy repair for instance ) do so.
+ * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+ * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+ * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
+ * request and process later (lazy repair for instance ) do so.
+ *
*/
- public abstract void rejected();
+ void rejected();
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index c6f5915..aaa9d60 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,10 +13,11 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V> Task<V > submit( Task<V> task );
+ public <V> ListenableFuture<V> submit( Task<V> task );
/**
* Stop the task executor without waiting for scheduled threads to run
*/
public void shutdown();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9656b5f..fa63eee 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -23,13 +23,13 @@ public class NamedTaskExecutorImplTest {
@Test
public void jobSuccess() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+ final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
executor.submit( task );
@@ -44,7 +44,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void exceptionThrown() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -53,10 +53,9 @@ public class NamedTaskExecutorImplTest {
final RuntimeException re = new RuntimeException( "throwing exception" );
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-
-
@Override
- public Void executeTask() {
+ public Void call() throws Exception {
+ super.call();
throw re;
}
};
@@ -76,7 +75,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void noCapacity() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -85,8 +84,8 @@ public class NamedTaskExecutorImplTest {
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
@Override
- public Void executeTask() throws Exception {
- super.executeTask();
+ public Void call() throws Exception {
+ super.call();
//park this thread so it takes up a task and the next is rejected
final Object mutex = new Object();
@@ -130,22 +129,22 @@ public class NamedTaskExecutorImplTest {
public void noCapacityWithQueue() throws InterruptedException {
final int threadPoolSize = 1;
-
+ final int queueSize = 10;
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- int iterations = threadPoolSize ;
+ int iterations = threadPoolSize + queueSize;
- for ( int i = 0; i < iterations; i++ ) {
+ for(int i = 0; i < iterations; i ++) {
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
@Override
- public Void executeTask() throws Exception {
- super.executeTask();
+ public Void call() throws Exception {
+ super.call();
//park this thread so it takes up a task and the next is rejected
final Object mutex = new Object();
@@ -161,6 +160,7 @@ public class NamedTaskExecutorImplTest {
}
+
runLatch.await( 1000, TimeUnit.MILLISECONDS );
//now submit the second task
@@ -185,81 +185,12 @@ public class NamedTaskExecutorImplTest {
}
- @Test
- public void jobTreeResult() throws InterruptedException {
-
- final int threadPoolSize = 4;
-
-
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-
- //accomodates for splitting the job 1->2->4 and joining
- final CountDownLatch runLatch = new CountDownLatch( 7 );
-
-
- TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+ private static abstract class TestTask<V> implements Task<V, UUID> {
- executor.submit( task );
-
-
- //compute our result
- Integer result = task.join();
-
- //result should be 1+2*2+3*4
- final int expected = 4*3;
-
- assertEquals(expected, result.intValue());
-
- //just to check our latches
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //now submit the second task
-
-
- }
-
-
- private static class TestRecursiveTask extends TestTask<Integer> {
-
- private final int depth;
- private final int maxDepth;
-
- private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
- final CountDownLatch runLatch, final int depth, final int maxDepth ) {
- super( exceptionLatch, rejectedLatch, runLatch );
- this.depth = depth;
- this.maxDepth = maxDepth;
- }
-
-
- @Override
- public Integer executeTask() throws Exception {
-
- if(depth == maxDepth ){
- return depth;
- }
-
- TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth );
-
- TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth );
-
- //run our left in another thread
- left.fork();
-
- return right.compute() + left.join();
- }
- }
-
-
- private static abstract class TestTask<V> extends Task<V> {
-
- protected final List<Throwable> exceptions;
- protected final CountDownLatch exceptionLatch;
- protected final CountDownLatch rejectedLatch;
- protected final CountDownLatch runLatch;
+ private final List<Throwable> exceptions;
+ private final CountDownLatch exceptionLatch;
+ private final CountDownLatch rejectedLatch;
+ private final CountDownLatch runLatch;
private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -272,6 +203,11 @@ public class NamedTaskExecutorImplTest {
}
+ @Override
+ public UUID getId() {
+ return UUIDGenerator.newTimeUUID();
+ }
+
@Override
public void exceptionThrown( final Throwable throwable ) {
@@ -287,7 +223,7 @@ public class NamedTaskExecutorImplTest {
@Override
- public V executeTask() throws Exception {
+ public V call() throws Exception {
runLatch.countDown();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index a3978fc..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
@Singleton
@Provides
public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
- return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount() );
+ return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index ed63587..4fe1a63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,8 +27,6 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
import com.google.common.util.concurrent.ListenableFuture;
@@ -44,8 +42,9 @@ public interface ShardGroupCompaction {
*
* @return A ListenableFuture with the result. Note that some
*/
- public Task<AuditResult> evaluateShardGroup(
- final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
+ public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group );
public enum AuditResult {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index a84a0f0..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,13 +37,14 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.ImmediateTask;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -65,6 +66,9 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -273,29 +277,45 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
- final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group ) {
+ public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
final double repairChance = random.nextDouble();
//don't audit, we didn't hit our chance
if ( repairChance > graphFig.getShardRepairChance() ) {
- return new ImmediateTask<AuditResult>( AuditResult.NOT_CHECKED ) {};
+ return Futures.immediateFuture( AuditResult.NOT_CHECKED );
}
/**
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
- return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+ ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new FutureCallback<AuditResult>() {
+ @Override
+ public void onSuccess( @Nullable final AuditResult result ) {
+ LOG.debug( "Successfully completed audit of task {}", result );
+ }
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to perform audit. Exception is ", t );
+ }
+ } );
+
+ return future;
}
- private final class ShardAuditTask extends Task<AuditResult> {
+ private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
@@ -310,6 +330,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
+ @Override
+ public ShardAuditKey getId() {
+ return new ShardAuditKey( scope, edgeMeta, group );
+ }
+
@Override
public void exceptionThrown( final Throwable throwable ) {
@@ -320,12 +345,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
public void rejected() {
//ignore, if this happens we don't care, we're saturated, we can check later
- LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
+ LOG.error( "Rejected audit for shard of {}", getId() );
}
@Override
- public AuditResult executeTask() throws Exception {
+ public AuditResult call() throws Exception {
/**
* We don't have a compaction pending. Run an audit on the shards
*/
@@ -376,8 +401,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
*/
try {
CompactionResult result = compact( scope, edgeMeta, group );
- LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
- + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
+ LOG.info(
+ "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+ "{} is {}",
+ new Object[] { scope, edgeMeta, group, result } );
}
finally {
shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -391,7 +418,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- public static final class ShardAuditKey {
+ private static final class ShardAuditKey {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
private final ShardEntryGroup group;
[06/15] git commit: Finished testing of the EntityVersionCleanupTask
Posted by to...@apache.org.
Finished testing of the EntityVersionCleanupTask
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d4f80e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d4f80e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d4f80e7c
Branch: refs/heads/two-dot-o
Commit: d4f80e7c85093e17081a4210491022b9001a02b7
Parents: 018dbee
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 15:25:09 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:07:50 2014 -0600
----------------------------------------------------------------------
.../impl/EntityVersionCleanupTask.java | 121 ++--
.../impl/EntityVersionCleanupTaskTest.java | 583 ++++++++++++++++++-
.../persistence/core/task/ImmediateTask.java | 12 +-
.../core/task/NamedTaskExecutorImpl.java | 10 +-
.../usergrid/persistence/core/task/Task.java | 27 +-
.../persistence/core/task/TaskExecutor.java | 7 +-
.../core/task/NamedTaskExecutorImplTest.java | 9 +-
.../impl/shard/ShardGroupCompaction.java | 2 +-
.../shard/impl/ShardGroupCompactionImpl.java | 13 +-
9 files changed, 684 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 29ca3ac..3d1483d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,10 +1,10 @@
package org.apache.usergrid.persistence.collection.impl;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Stack;
import java.util.UUID;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import org.slf4j.Logger;
@@ -15,10 +15,8 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.entity.EntityVersion;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -27,12 +25,11 @@ import org.apache.usergrid.persistence.model.entity.Id;
* Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
* retained, the range is exclusive.
*/
-public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+public class EntityVersionCleanupTask extends Task<Void> {
private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
- private final CollectionIoEvent<EntityVersion> collectionIoEvent;
private final List<EntityVersionDeleted> listeners;
private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
@@ -40,29 +37,31 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
private final SerializationFig serializationFig;
+ private final CollectionScope scope;
+ private final Id entityId;
+ private final UUID version;
- private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final CollectionIoEvent<EntityVersion> collectionIoEvent,
- final List<EntityVersionDeleted> listeners,
- final SerializationFig serializationFig ) {
- this.collectionIoEvent = collectionIoEvent;
- this.listeners = listeners;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.entitySerializationStrategy = entitySerializationStrategy;
- this.serializationFig = serializationFig;
- }
+ public EntityVersionCleanupTask( final SerializationFig serializationFig,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final List<EntityVersionDeleted> listeners, final CollectionScope scope,
+ final Id entityId, final UUID version ) {
- @Override
- public CollectionIoEvent<EntityVersion> getId() {
- return collectionIoEvent;
+ this.serializationFig = serializationFig;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.listeners = listeners;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.version = version;
}
@Override
public void exceptionThrown( final Throwable throwable ) {
- LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+ LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+ new Object[] { scope, entityId, version }, throwable );
}
@@ -81,15 +80,15 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
@Override
- public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
+ public Void executeTask() throws Exception {
- final CollectionScope scope = collectionIoEvent.getEntityCollection();
- final Id entityId = collectionIoEvent.getEvent().getId();
- final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+ final UUID maxVersion = version;
LogEntryIterator logEntryIterator =
- new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() );
+ new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+ serializationFig.getHistorySize() );
//for every entry, we want to clean it up with listeners
@@ -100,39 +99,73 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
final UUID version = logEntry.getVersion();
- List<ForkJoinTask<Void>> tasks = new ArrayList<>();
- //execute all the listeners
- for (final EntityVersionDeleted listener : listeners ) {
+ fireEvents();
- tasks.add( new RecursiveTask<Void>() {
- @Override
- protected Void compute() {
- listener.versionDeleted( scope, entityId, version );
- return null;
- }
- }.fork() );
+ //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
+ //after every successful invocation of listeners and entity removal
+ entitySerializationStrategy.delete( scope, entityId, version ).execute();
+ logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+ }
- }
- //wait for them to complete
+ return null;
+ }
- joinAll(tasks);
- //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
- //after every successful invocation of listeners and entity removal
- entitySerializationStrategy.delete( scope, entityId, version ).execute();
+ private void fireEvents() throws ExecutionException, InterruptedException {
- logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+ if ( listeners.size() == 0 ) {
+ return;
}
+ //stack to track forked tasks
+ final Stack<RecursiveTask<Void>> tasks = new Stack<>();
+
+
+ //we don't want to fork the final listener, we'll run that in our current thread
+ final int forkedTaskSize = listeners.size() - 1;
+
+
+ //execute all the listeners
+ for ( int i = 0; i < forkedTaskSize; i++ ) {
+
+ final EntityVersionDeleted listener = listeners.get( i );
+
+ final RecursiveTask<Void> task = createTask( listener );
+
+ task.fork();
- return collectionIoEvent;
+ tasks.push( task );
+ }
+
+
+ final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize ) );
+
+ lastTask.invoke();
+
+
+ //wait for them to complete
+ while ( !tasks.isEmpty() ) {
+ tasks.pop().get();
+ }
}
+ /**
+ * Return the new task to execute
+ */
+ private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
+ return new RecursiveTask<Void>() {
+ @Override
+ protected Void compute() {
+ listener.versionDeleted( scope, entityId, version );
+ return null;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 050ea9e..2df75f1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -22,10 +22,34 @@ package org.apache.usergrid.persistence.collection.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import org.junit.AfterClass;
import org.junit.Test;
-import static org.junit.Assert.*;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
@@ -33,8 +57,563 @@ import static org.junit.Assert.*;
*/
public class EntityVersionCleanupTaskTest {
+ private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4 );
+
+
+ @AfterClass
+ public static void shutdown() {
+ taskExecutor.shutdown();
+ }
+
+
+ @Test
+ public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //intentionally no events
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock =
+ LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 2 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //verify it was run
+ verify( firstBatch ).execute();
+
+ verify( secondBatch ).execute();
+ }
+
+
+ /**
+ * Tests the cleanup task on the first version ceated
+ */
+ @Test
+ public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //intentionally no events
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock =
+ LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //verify it was run
+ verify( firstBatch, never() ).execute();
+
+ verify( secondBatch, never() ).execute();
+ }
+
+
@Test
- public void multiPageTask(){
+ public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 1;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+ final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ listeners.add( eventListener );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //we deleted the version
+ //verify it was run
+ verify( firstBatch ).execute();
+
+ verify( secondBatch ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+ @Test
+ public void multipleListenerMultipleVersions()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 10;
+
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+
+ final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+ final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+ final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+ listeners.add( listener1 );
+ listeners.add( listener2 );
+ listeners.add( listener3 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ //wait for the task
+ cleanupTask.get();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( firstBatch, times( sizeToReturn ) ).execute();
+
+ verify( secondBatch, times( sizeToReturn ) ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+ /**
+ * Tests what happens when our listeners are VERY slow
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws ConnectionException
+ */
+ @Test
+ public void multipleListenerMultipleVersionsNoThreadsToRun()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 10;
+
+
+ final int listenerCount = 5;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+ final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+ final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+ listeners.add( listener1 );
+ listeners.add( listener2 );
+ listeners.add( listener3 );
+ listeners.add( listener4 );
+ listeners.add( listener5 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask cleanupTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( cleanupTask );
+
+ /**
+ * While we're not done, release latches every 200 ms
+ */
+ while(!cleanupTask.isDone()) {
+ Thread.sleep( 200 );
+ waitSemaphore.release( listenerCount );
+ }
+
+ //wait for the task
+ cleanupTask.get();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( firstBatch, times( sizeToReturn ) ).execute();
+
+ verify( secondBatch, times( sizeToReturn ) ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+
+ /**
+ * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws ConnectionException
+ */
+ @Test
+ public void runsWhenRejected()
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ /**
+ * only 1 thread on purpose, we want to saturate the task
+ */
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+
+ //create a latch for the event listener, and add it to the list of events
+ final int sizeToReturn = 10;
+
+
+ final int listenerCount = 1;
+
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+ final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+
+ final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+ listeners.add( listener1 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+
+ final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+
+ //mock up a single log entry for our first test
+ final LogEntryMock logEntryMock = LogEntryMock
+ .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+ EntityVersionCleanupTask firstTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+ EntityVersionCleanupTask secondTask =
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+
+ final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+ //set up returning a mutator
+ when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( firstBatch );
+
+
+ final MutationBatch secondBatch = mock( MutationBatch.class );
+
+ when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ .thenReturn( secondBatch );
+
+
+ //start the task
+ taskExecutor.submit( firstTask );
+
+ //now start another task while the slow running task is running
+ taskExecutor.submit( secondTask );
+
+ //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
+ secondTask.get();
+
+ /**
+ * While we're not done, release latches every 200 ms
+ */
+ while(!firstTask.isDone()) {
+ Thread.sleep( 200 );
+ waitSemaphore.release( listenerCount );
+ }
+
+ //wait for the task
+ firstTask.get();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( firstBatch, times( sizeToReturn*2 ) ).execute();
+
+ verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+ //the latch was executed
+ latch.await();
+ }
+
+
+ private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+ final CountDownLatch invocationLatch;
+
+
+ private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+ this.invocationLatch = invocationLatch;
+ }
+
+
+ @Override
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ invocationLatch.countDown();
+ }
+ }
+
+
+ private static class SlowListener extends EntityVersionDeletedTest {
+ final Semaphore blockLatch;
+
+
+ private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+ super( invocationLatch );
+ this.blockLatch = blockLatch;
+ }
+
+ @Override
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ //wait for unblock to happen before counting down invocation latches
+ try {
+ blockLatch.acquire();
+ }
+ catch ( InterruptedException e ) {
+ throw new RuntimeException( e );
+ }
+ super.versionDeleted( scope, entityId, entityVersion );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
index 627e7e8..f6c28a3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -5,23 +5,17 @@ package org.apache.usergrid.persistence.core.task;
* Does not perform computation, just returns the value passed to it
*
*/
-public class ImmediateTask<V, I> extends Task<V, I> {
+public class ImmediateTask<V> extends Task<V> {
+
- private final I id;
private final V returned;
- protected ImmediateTask( final I id, final V returned ) {
- this.id = id;
+ protected ImmediateTask( final V returned ) {
this.returned = returned;
}
- @Override
- public I getId() {
- return id;
- }
-
@Override
public V executeTask() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 4fc72c8..bb9cac8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -55,6 +55,8 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
* @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
*/
public NamedTaskExecutorImpl( final String name, final int poolSize ) {
+
+ //TODO, figure out how to name the fork/join threads in the pool
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -67,7 +69,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
- public <V, I> Task<V, I> submit( final Task<V, I> task ) {
+ public <V> Task<V> submit( final Task<V> task ) {
try {
executorService.submit( task );
@@ -80,6 +82,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
+ @Override
+ public void shutdown() {
+ executorService.shutdownNow();
+ }
+
+
private final class NamedForkJoinPool extends ForkJoinPool {
private NamedForkJoinPool( final int workerThreadCount ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 4dccc72..5d35ce4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -29,13 +29,7 @@ import java.util.concurrent.RecursiveTask;
/**
* The task to execute
*/
-public abstract class Task<V, I> extends RecursiveTask<V> {
-
- /**
- * Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
- */
- public abstract I getId();
-
+public abstract class Task<V> extends RecursiveTask<V> {
@Override
protected V compute() {
@@ -49,25 +43,6 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
}
- /**
- * Fork all tasks in the list except the last one. The last will be run in the caller thread
- * The others will wait for join
- * @param tasks
- *
- */
- public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T> tasks) throws ExecutionException, InterruptedException {
-
- //don't fork the last one
- List<V> results = new ArrayList<>(tasks.size());
-
- for(T task: tasks){
- results.add( task.join() );
- }
-
- return results;
-
- }
-
/**
* Execute the task
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index a3553c7..c6f5915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,10 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> Task<V, I > submit( Task<V, I> task );
+ public <V> Task<V > submit( Task<V> task );
+
+ /**
+ * Stop the task executor without waiting for scheduled threads to run
+ */
+ public void shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index f65d5a6..9656b5f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+ final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
executor.submit( task );
@@ -254,7 +254,7 @@ public class NamedTaskExecutorImplTest {
}
- private static abstract class TestTask<V> extends Task<V, UUID> {
+ private static abstract class TestTask<V> extends Task<V> {
protected final List<Throwable> exceptions;
protected final CountDownLatch exceptionLatch;
@@ -272,11 +272,6 @@ public class NamedTaskExecutorImplTest {
}
- @Override
- public UUID getId() {
- return UUIDGenerator.newTimeUUID();
- }
-
@Override
public void exceptionThrown( final Throwable throwable ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 15acaa8..ed63587 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -44,7 +44,7 @@ public interface ShardGroupCompaction {
*
* @return A ListenableFuture with the result. Note that some
*/
- public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+ public Task<AuditResult> evaluateShardGroup(
final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 751b4d9..a84a0f0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -273,7 +273,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+ public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
final DirectedEdgeMeta edgeMeta,
final ShardEntryGroup group ) {
@@ -282,7 +282,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
//don't audit, we didn't hit our chance
if ( repairChance > graphFig.getShardRepairChance() ) {
- return new ImmediateTask<AuditResult, ShardAuditKey>( new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
+ return new ImmediateTask<AuditResult>( AuditResult.NOT_CHECKED ) {};
}
/**
@@ -295,7 +295,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
+ private final class ShardAuditTask extends Task<AuditResult> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
@@ -310,11 +310,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- @Override
- public ShardAuditKey getId() {
- return new ShardAuditKey( scope, edgeMeta, group );
- }
-
@Override
public void exceptionThrown( final Throwable throwable ) {
@@ -325,7 +320,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
public void rejected() {
//ignore, if this happens we don't care, we're saturated, we can check later
- LOG.error( "Rejected audit for shard of {}", getId() );
+ LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
}
[02/15] git commit: Migrated graph over to new task executor
Posted by to...@apache.org.
Migrated graph over to new task executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4c72f5c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4c72f5c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4c72f5c6
Branch: refs/heads/two-dot-o
Commit: 4c72f5c636bae9b16df7578302538c7b4d9600e7
Parents: 66e508a
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 17:39:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 17:39:07 2014 -0600
----------------------------------------------------------------------
.../collection/event/EntityVersionRemoved.java | 26 +++
.../collection/guice/CollectionModule.java | 17 +-
.../serialization/SerializationFig.java | 32 ++-
.../core/astyanax/AstyanaxKeyspaceProvider.java | 2 +
.../persistence/core/guice/CommonModule.java | 2 +
.../core/task/NamedTaskExecutorImpl.java | 14 +-
.../persistence/core/task/TaskExecutor.java | 2 +-
.../usergrid/persistence/graph/GraphFig.java | 2 +
.../persistence/graph/guice/GraphModule.java | 15 ++
.../shard/impl/ShardGroupCompactionImpl.java | 202 ++++++++++++-------
.../impl/shard/ShardGroupCompactionTest.java | 5 +-
11 files changed, 227 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
new file mode 100644
index 0000000..dca575d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+
+
+/**
+ *
+ * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionRemoved {
+
+
+ /**
+ * The version specified was removed.
+ * @param scope
+ * @param entityId
+ * @param entityVersion
+ */
+ public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 84f69db..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
@@ -79,8 +81,7 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Inject
@Write
-
- public WriteStart write (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+ public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
return writeStart;
@@ -90,13 +91,21 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Inject
@WriteUpdate
-
- public WriteStart writeUpdate (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+ public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
return writeStart;
}
+ @Inject
+ @Singleton
+ @Provides
+ public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
+ return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 81302a6..7e69a19 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -15,25 +15,45 @@ public interface SerializationFig extends GuicyFig {
/**
* Time to live timeout in seconds.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.stage.transient.timeout" )
- @Default( "5" )
+ @Key("collection.stage.transient.timeout")
+ @Default("5")
int getTimeout();
/**
* Number of history items to return for delete.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.delete.history.size" )
- @Default( "100" )
+ @Key("collection.delete.history.size")
+ @Default("100")
int getHistorySize();
/**
* Number of items to buffer.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.buffer.size" )
- @Default( "10" )
+ @Key("collection.buffer.size")
+ @Default("10")
int getBufferSize();
+
+
+ /**
+ * The size of threads to have in the task pool
+ */
+ @Key( "collection.task.pool.threadsize" )
+ @Default( "20" )
+ int getTaskPoolThreadSize();
+
+
+
+ /**
+ * The size of threads to have in the task pool
+ */
+ @Key( "collection.task.pool.queuesize" )
+ @Default( "20" )
+ int getTaskPoolQueueSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..8bd5a9f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
import com.google.inject.Inject;
import com.google.inject.Provider;
+import com.google.inject.Singleton;
import com.netflix.astyanax.AstyanaxConfiguration;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
@@ -41,6 +42,7 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
*
* @author tnine
*/
+@Singleton
public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
private final CassandraFig cassandraFig;
private final CassandraConfig cassandraConfig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..5f461bb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -63,4 +63,6 @@ public class CommonModule extends AbstractModule {
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8ba73a0..8184937 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -53,7 +53,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
- public <V, I> void submit( final Task<V, I> task ) {
+ public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
final ListenableFuture<V> future;
@@ -67,8 +67,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
catch ( RejectedExecutionException ree ) {
task.rejected();
- return;
+ return Futures.immediateCancelledFuture();
}
+
+ return future;
}
@@ -147,16 +149,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-
- // ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
- //
- // future.
- // final Task<?, ?> task = ( Task<?, ?> ) r;
LOG.warn( "Audit queue full, rejecting audit task {}", r );
throw new RejectedExecutionException( "Unable to run task, queue full" );
- // LOG.warn( "Audit queue full, rejecting audit task {}", task );
- // task.rejected();
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index e60da83..b5491bc 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -10,5 +10,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> void submit(Task<V, I> task);
+ public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 0a6ecfa..894e74a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -90,6 +90,8 @@ public interface GraphFig extends GuicyFig {
public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+
+
@Default("1000")
@Key(SCAN_PAGE_SIZE)
int getScanPageSize();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -62,7 +64,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
@@ -144,6 +149,16 @@ public class GraphModule extends AbstractModule {
migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
}
+
+
+ @Inject
+ @Singleton
+ @Provides
+ public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
+ return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 5076424..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -30,7 +30,6 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
@@ -46,6 +45,8 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -68,8 +69,6 @@ import com.google.common.hash.PrimitiveSink;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -91,7 +90,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
- private final ListeningExecutorService executorService;
+ private final TaskExecutor taskExecutor;
private final TimeService timeService;
private final GraphFig graphFig;
private final NodeShardAllocation nodeShardAllocation;
@@ -110,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final NodeShardAllocation nodeShardAllocation,
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
- final EdgeShardSerialization edgeShardSerialization ) {
+ final EdgeShardSerialization edgeShardSerialization,
+ final TaskExecutor taskExecutor ) {
this.timeService = timeService;
this.graphFig = graphFig;
@@ -124,8 +124,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
this.shardAuditTaskTracker = new ShardAuditTaskTracker();
- executorService = MoreExecutors.listeningDecorator(
- new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
+ this.taskExecutor = taskExecutor;
}
@@ -232,7 +231,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
- * We didn't move anything this pass, mark the shard as compacted. If we move something, it means that we missed it on the first pass
+ * We didn't move anything this pass, mark the shard as compacted. If we move something,
+ * it means that we missed it on the first pass
* or someone is still not writing to the target shard only.
*/
if ( edgeCount == 0 ) {
@@ -293,91 +293,153 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
- ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+ ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new FutureCallback<AuditResult>() {
@Override
- public AuditResult call() throws Exception {
+ public void onSuccess( @Nullable final AuditResult result ) {
+ LOG.debug( "Successfully completed audit of task {}", result );
+ }
- /**
- * We don't have a compaction pending. Run an audit on the shards
- */
- if ( !group.isCompactionPending() ) {
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to perform audit. Exception is ", t );
+ }
+ } );
- /**
- * Check if we should allocate, we may want to
- */
+ return future;
+ }
- /**
- * It's already compacting, don't do anything
- */
- if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- return AuditResult.CHECKED_NO_OP;
- }
- try {
+ private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
- final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
- if ( !created ) {
- return AuditResult.CHECKED_NO_OP;
- }
- }
- finally {
- shardAuditTaskTracker.complete( scope, edgeMeta, group );
- }
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta edgeMeta;
+ private final ShardEntryGroup group;
- return AuditResult.CHECKED_CREATED;
- }
+ public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+ this.scope = scope;
+ this.edgeMeta = edgeMeta;
+ this.group = group;
+ }
+
+
+ @Override
+ public ShardAuditKey getId() {
+ return new ShardAuditKey( scope, edgeMeta, group );
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ LOG.error( "Unable to execute audit for shard of {}", throwable );
+ }
+
+
+ @Override
+ public void rejected() {
+ //ignore, if this happens we don't care, we're saturated, we can check later
+ LOG.error( "Rejected audit for shard of {}", getId() );
+ }
- //check our taskmanager
+ @Override
+ public AuditResult call() throws Exception {
+ /**
+ * We don't have a compaction pending. Run an audit on the shards
+ */
+ if ( !group.isCompactionPending() ) {
/**
- * Do the compaction
+ * Check if we should allocate, we may want to
*/
- if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
- /**
- * It's already compacting, don't do anything
- */
- if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- return AuditResult.COMPACTING;
- }
- /**
- * We use a finally b/c we always want to remove the task track
- */
- try {
- CompactionResult result = compact( scope, edgeMeta, group );
- LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
- }
- finally {
- shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.CHECKED_NO_OP;
+ }
+
+ try {
+
+ final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+ if ( !created ) {
+ return AuditResult.CHECKED_NO_OP;
}
- return AuditResult.COMPACTED;
+ }
+ finally {
+ shardAuditTaskTracker.complete( scope, edgeMeta, group );
}
- //no op, there's nothing we need to do to this shard
- return AuditResult.NOT_CHECKED;
- }
- } );
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new FutureCallback<AuditResult>() {
- @Override
- public void onSuccess( @Nullable final AuditResult result ) {
- LOG.debug( "Successfully completed audit of task {}", result );
+ return AuditResult.CHECKED_CREATED;
}
+ //check our taskmanager
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to perform audit. Exception is ", t );
+
+ /**
+ * Do the compaction
+ */
+ if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.COMPACTING;
+ }
+
+ /**
+ * We use a finally b/c we always want to remove the task track
+ */
+ try {
+ CompactionResult result = compact( scope, edgeMeta, group );
+ LOG.info(
+ "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+ "{} is {}",
+ new Object[] { scope, edgeMeta, group, result } );
+ }
+ finally {
+ shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ }
+ return AuditResult.COMPACTED;
}
- } );
- return future;
+ //no op, there's nothing we need to do to this shard
+ return AuditResult.NOT_CHECKED;
+ }
+ }
+
+
+ private static final class ShardAuditKey {
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta edgeMeta;
+ private final ShardEntryGroup group;
+
+
+ private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+ this.scope = scope;
+ this.edgeMeta = edgeMeta;
+ this.group = group;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ShardAuditKey{" +
+ "scope=" + scope +
+ ", edgeMeta=" + edgeMeta +
+ ", group=" + group +
+ '}';
+ }
}
@@ -531,7 +593,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
-
public static final class CompactionResult {
public final long copiedEdges;
@@ -541,7 +602,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
public final Shard compactedShard;
-
private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
final Set<Shard> removedShards, final Shard compactedShard ) {
this.copiedEdges = copiedEdges;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 1513e85..9f0792d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -35,6 +35,7 @@ import org.mockito.Matchers;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
@@ -84,6 +85,8 @@ public class ShardGroupCompactionTest {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+ final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
final long delta = 10000;
final long createTime = 20000;
@@ -100,7 +103,7 @@ public class ShardGroupCompactionTest {
ShardGroupCompactionImpl compaction =
new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
- edgeColumnFamilies, keyspace, edgeShardSerialization );
+ edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor );
DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );
[05/15] git commit: Added tests for the log entry iterator
Posted by to...@apache.org.
Added tests for the log entry iterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/018dbeeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/018dbeeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/018dbeeb
Branch: refs/heads/two-dot-o
Commit: 018dbeeb8ad46bf536132120f78736fa2b0dd367
Parents: dc3f448
Author: Todd Nine <to...@apache.org>
Authored: Fri Sep 26 11:21:45 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Fri Sep 26 16:21:18 2014 -0600
----------------------------------------------------------------------
.../impl/EntityVersionCleanupTask.java | 52 +++++--
.../serialization/impl/LogEntryIterator.java | 111 ++++++++++++++
.../serialization/impl/VersionIterator.java | 111 --------------
.../impl/EntityVersionCleanupTaskTest.java | 40 +++++
.../impl/LogEntryIteratorTest.java | 131 ++++++++++++++++
.../collection/util/LogEntryMock.java | 152 +++++++++++++++++++
.../core/task/NamedTaskExecutorImpl.java | 131 +++-------------
.../usergrid/persistence/core/task/Task.java | 42 +++++
8 files changed, 539 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 85afce6..29ca3ac 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,8 +1,11 @@
package org.apache.usergrid.persistence.collection.impl;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -11,8 +14,10 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
import org.apache.usergrid.persistence.core.entity.EntityVersion;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -22,7 +27,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
* Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
* retained, the range is exclusive.
*/
-class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
@@ -33,15 +38,19 @@ class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, Co
private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
private final MvccEntitySerializationStrategy entitySerializationStrategy;
+ private final SerializationFig serializationFig;
+
private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final MvccEntitySerializationStrategy entitySerializationStrategy,
final CollectionIoEvent<EntityVersion> collectionIoEvent,
- final List<EntityVersionDeleted> listeners ) {
+ final List<EntityVersionDeleted> listeners,
+ final SerializationFig serializationFig ) {
this.collectionIoEvent = collectionIoEvent;
this.listeners = listeners;
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ this.serializationFig = serializationFig;
}
@@ -79,34 +88,51 @@ class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, Co
final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
- VersionIterator versionIterator =
- new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 );
-
+ LogEntryIterator logEntryIterator =
+ new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() );
- UUID currentVersion = null;
//for every entry, we want to clean it up with listeners
- while ( versionIterator.hasNext() ) {
+ while ( logEntryIterator.hasNext() ) {
+
+ final MvccLogEntry logEntry = logEntryIterator.next();
- currentVersion = versionIterator.next();
+
+ final UUID version = logEntry.getVersion();
+ List<ForkJoinTask<Void>> tasks = new ArrayList<>();
//execute all the listeners
- for ( EntityVersionDeleted listener : listeners ) {
- listener.versionDeleted( scope, entityId, currentVersion );
+ for (final EntityVersionDeleted listener : listeners ) {
+
+ tasks.add( new RecursiveTask<Void>() {
+ @Override
+ protected Void compute() {
+ listener.versionDeleted( scope, entityId, version );
+ return null;
+ }
+ }.fork() );
+
+
}
+ //wait for them to complete
+
+ joinAll(tasks);
+
//we do multiple invocations on purpose. Our log is our source of versions, only delete from it
//after every successful invocation of listeners and entity removal
- entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+ entitySerializationStrategy.delete( scope, entityId, version ).execute();
- logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+ logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
}
return collectionIoEvent;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
new file mode 100644
index 0000000..53eb6e3
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ *
+ */
+public class LogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final CollectionScope scope;
+ private final Id entityId;
+ private final int pageSize;
+
+
+ private Iterator<MvccLogEntry> elementItr;
+ private UUID nextStart;
+
+
+ /**
+ *
+ * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max
+ * @param pageSize The fetch size to get when querying the serialization strategy
+ */
+ public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final CollectionScope scope, final Id entityId, final UUID maxVersion,
+ final int pageSize ) {
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.nextStart = maxVersion;
+ this.pageSize = pageSize;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+ try {
+ advance();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to query cassandra", e );
+ }
+ }
+
+ return elementItr.hasNext();
+ }
+
+
+ @Override
+ public MvccLogEntry next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist" );
+ }
+
+ return elementItr.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator
+ */
+ public void advance() throws ConnectionException {
+
+ final int requestedSize = pageSize + 1;
+
+ //loop through even entry that's < this one and remove it
+ List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+ //we always remove the first version if it's equal since it's returned
+ if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ results.remove( 0 );
+ }
+
+
+ //we have results, set our next start
+ if ( results.size() == pageSize ) {
+ nextStart = results.get( results.size() - 1 ).getVersion();
+ }
+ //nothing left to do
+ else {
+ nextStart = null;
+ }
+
+ elementItr = results.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
deleted file mode 100644
index 323f12d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- *
- */
-public class VersionIterator implements Iterator<UUID> {
-
-
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private final CollectionScope scope;
- private final Id entityId;
- private final int pageSize;
-
-
- private Iterator<MvccLogEntry> elementItr;
- private UUID nextStart;
-
-
- /**
- *
- * @param logEntrySerializationStrategy The serialization strategy to get the log entries
- * @param scope The scope of the entity
- * @param entityId The id of the entity
- * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max
- * @param pageSize The fetch size to get when querying the serialization strategy
- */
- public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final CollectionScope scope, final Id entityId, final UUID maxVersion,
- final int pageSize ) {
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.scope = scope;
- this.entityId = entityId;
- this.nextStart = maxVersion;
- this.pageSize = pageSize;
- }
-
-
- @Override
- public boolean hasNext() {
- if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
- try {
- advance();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to query cassandra", e );
- }
- }
-
- return elementItr.hasNext();
- }
-
-
- @Override
- public UUID next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "No more elements exist" );
- }
-
- return elementItr.next().getVersion();
- }
-
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Remove is unsupported" );
- }
-
-
- /**
- * Advance our iterator
- */
- public void advance() throws ConnectionException {
-
- final int requestedSize = pageSize + 1;
-
- //loop through even entry that's < this one and remove it
- List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
-
- //we always remove the first version if it's equal since it's returned
- if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
- results.remove( 0 );
- }
-
-
- //we have results, set our next start
- if ( results.size() == requestedSize ) {
- nextStart = results.get( results.size() - 1 ).getVersion();
- }
- //nothing left to do
- else {
- nextStart = null;
- }
-
- elementItr = results.iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
new file mode 100644
index 0000000..050ea9e
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Cleanup task tests
+ */
+public class EntityVersionCleanupTaskTest {
+
+ @Test
+ public void multiPageTask(){
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
new file mode 100644
index 0000000..9ee284b
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
@@ -0,0 +1,131 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests iterator paging
+ */
+public class LogEntryIteratorTest {
+
+
+ @Test
+ public void empty() throws ConnectionException {
+
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+ final CollectionScope scope =
+ new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner" ), "entities" );
+
+ final Id entityId = new SimpleId( "entity" );
+
+ final int pageSize = 100;
+
+
+ //set the start version, it should be discarded
+ UUID start = UUIDGenerator.newTimeUUID();
+
+ when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) )
+ .thenReturn( new ArrayList<MvccLogEntry>() );
+
+
+ //now iterate we should get everything
+ LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+ assertFalse( itr.hasNext() );
+ }
+
+
+ @Test
+ public void partialLastPage() throws ConnectionException {
+
+
+ final int pageSize = 10;
+ final int totalPages = 3;
+ final int lastPageSize = pageSize / 2;
+
+ //have one half page
+
+ pageElements( pageSize, totalPages, lastPageSize );
+ }
+
+
+ @Test
+ public void emptyLastPage() throws ConnectionException {
+
+
+ final int pageSize = 10;
+ final int totalPages = 3;
+ final int lastPageSize = 0;
+
+ //have one half page
+
+ pageElements( pageSize, totalPages, lastPageSize );
+ }
+
+
+ public void pageElements( final int pageSize, final int totalPages, final int lastPageSize )
+ throws ConnectionException {
+
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+ final CollectionScope scope =
+ new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner" ), "entities" );
+
+ final Id entityId = new SimpleId( "entity" );
+
+
+ //have one half page
+ final int toGenerate = pageSize * totalPages + lastPageSize;
+
+
+ final LogEntryMock mockResults =
+ LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, toGenerate );
+
+ Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
+
+ //this element should be skipped
+ UUID start = expectedEntries.next().getVersion();
+
+ //now iterate we should get everything
+ LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+ while ( expectedEntries.hasNext() && itr.hasNext() ) {
+ final MvccLogEntry expected = expectedEntries.next();
+
+ final MvccLogEntry returned = itr.next();
+
+ assertEquals( expected, returned );
+ }
+
+
+ assertFalse( itr.hasNext() );
+ assertFalse( expectedEntries.hasNext() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
new file mode 100644
index 0000000..a25bc94
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -0,0 +1,152 @@
+package org.apache.usergrid.persistence.collection.util;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Utility for constructing representative log entries for mock serialziation from high to low
+ */
+public class LogEntryMock {
+
+
+ private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+
+ private final Id entityId;
+
+
+ /**
+ * Create a mock list of versions of the specified size
+ *
+ * @param entityId The entity Id to use
+ * @param size The size to use
+ */
+ private LogEntryMock(final Id entityId, final int size ) {
+
+ this.entityId = entityId;
+
+ for ( int i = 0; i < size; i++ ) {
+
+ final UUID version = UUIDGenerator.newTimeUUID();
+
+ entries.put( version,
+ new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) );
+ }
+ }
+
+
+ /**
+ * Init the mock with the given data structure
+ * @param logEntrySerializationStrategy The strategy to moc
+ * @param scope
+ * @throws ConnectionException
+ */
+ private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final CollectionScope scope )
+
+ throws ConnectionException {
+
+ //wire up the mocks
+ when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class) )).thenAnswer( new Answer<List<MvccLogEntry>>() {
+
+
+ @Override
+ public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable {
+ final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+ final int count = (Integer)invocation.getArguments()[3];
+
+ final List<MvccLogEntry> results = new ArrayList<>( count );
+
+ final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator();
+
+ for(int i = 0; i < count && itr.hasNext(); i ++){
+ results.add( itr.next() );
+ }
+
+
+ return results;
+ }
+ } );
+ }
+
+
+ /**
+ * Get the entries (ordered from high to low) this mock contains
+ * @return
+ */
+ public Collection<MvccLogEntry> getEntries(){
+ return entries.values();
+ }
+
+ /**
+ *
+ * @param logEntrySerializationStrategy The mock to use
+ * @param scope The scope to use
+ * @param entityId The entityId to use
+ * @param size The number of entries to mock
+ * @throws ConnectionException
+ */
+ public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final CollectionScope scope,final Id entityId, final int size )
+
+ throws ConnectionException {
+ LogEntryMock mock = new LogEntryMock( entityId, size );
+ mock.initMock( logEntrySerializationStrategy, scope );
+
+ return mock;
+ }
+
+
+ private static final class ReversedUUIDComparator implements Comparator<UUID> {
+
+ public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
+
+
+ @Override
+ public int compare( final UUID o1, final UUID o2 ) {
+ return UUIDComparator.staticCompare( o1, o2 ) * -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index aba8cd5..4fc72c8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.usergrid.persistence.core.task;
@@ -36,7 +54,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
* @param name The name of this instance of the task executor
* @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
*/
- public NamedTaskExecutorImpl( final String name, final int poolSize) {
+ public NamedTaskExecutorImpl( final String name, final int poolSize ) {
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -44,12 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
this.name = name;
this.poolSize = poolSize;
-// final BlockingQueue<Runnable> queue =
-// queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
-//
-// executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
-
- this.executorService = new NamedForkJoinPool(poolSize);
+ this.executorService = new NamedForkJoinPool( poolSize );
}
@@ -57,126 +70,30 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
public <V, I> Task<V, I> submit( final Task<V, I> task ) {
try {
- executorService.submit( task );
+ executorService.submit( task );
}
catch ( RejectedExecutionException ree ) {
task.rejected();
}
return task;
-
}
- /**
- * Callback for when the task succeeds or fails.
- */
- private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
-
- private final Task<V, I> task;
-
-
- private TaskFutureCallBack( Task<V, I> task ) {
- this.task = task;
- }
-
-
- @Override
- public void onSuccess( @Nullable final V result ) {
- LOG.debug( "Successfully completed task ", task );
- }
-
-
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to execute task. Exception is ", t );
-
- task.exceptionThrown( t );
- }
- }
-
-
- private final class NamedForkJoinPool extends ForkJoinPool{
+ private final class NamedForkJoinPool extends ForkJoinPool {
private NamedForkJoinPool( final int workerThreadCount ) {
//TODO, verify the scheduler at the end
super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
}
-
-
-
}
- private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+ private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException( final Thread t, final Throwable e ) {
LOG.error( "Uncaught exception on thread {} was {}", t, e );
}
}
-
-
-
-
- private final class NamedWorkerThread extends ForkJoinWorkerThread{
-
- /**
- * Creates a ForkJoinWorkerThread operating in the given pool.
- *
- * @param pool the pool this thread works in
- *
- * @throws NullPointerException if pool is null
- */
- protected NamedWorkerThread(final String name, final ForkJoinPool pool ) {
- super( pool );
- }
- }
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
- super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
- new RejectedHandler() );
- }
- }
-
-
- /**
- * Thread factory that will name and count threads for easier debugging
- */
- private final class CountingThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- Thread t = new Thread( r, name + "-" + newValue );
-
- t.setDaemon( true );
-
- return t;
- }
- }
-
-
- /**
- * The handler that will handle rejected executions and signal the interface
- */
- private final class RejectedHandler implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
- throw new RejectedExecutionException( "Unable to run task, queue full" );
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index eb04c2c..4dccc72 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,6 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.usergrid.persistence.core.task;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
@@ -28,6 +50,26 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
/**
+ * Fork all tasks in the list except the last one. The last will be run in the caller thread
+ * The others will wait for join
+ * @param tasks
+ *
+ */
+ public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T> tasks) throws ExecutionException, InterruptedException {
+
+ //don't fork the last one
+ List<V> results = new ArrayList<>(tasks.size());
+
+ for(T task: tasks){
+ results.add( task.join() );
+ }
+
+ return results;
+
+ }
+
+
+ /**
* Execute the task
*/
public abstract V executeTask() throws Exception;
[10/15] git commit: Removed stress tests that should not be part of
the build
Posted by to...@apache.org.
Removed stress tests that should not be part of the build
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/324a333b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/324a333b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/324a333b
Branch: refs/heads/two-dot-o
Commit: 324a333badf673108dd2e59971f178f9af204d03
Parents: 9c63ce6
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 16:00:02 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 16:00:02 2014 -0600
----------------------------------------------------------------------
.../persistence/collection/EntityCollectionManagerStressTest.java | 2 ++
.../apache/usergrid/persistence/graph/GraphManagerStressTest.java | 1 +
2 files changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/324a333b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
index 62a42ec..3f5b071 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
@@ -23,6 +23,7 @@ import java.util.Set;
import org.jukito.UseModules;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertNotNull;
@RunWith(ITRunner.class)
@UseModules(TestCollectionModule.class)
+@Ignore("Stress test should not be run in embedded mode")
public class EntityCollectionManagerStressTest {
private static final Logger log = LoggerFactory.getLogger(
EntityCollectionManagerStressTest.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/324a333b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
index a570b7c..aa2d027 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
@@ -58,6 +58,7 @@ import static org.mockito.Mockito.when;
@RunWith(ITRunner.class)
@UseModules(TestGraphModule.class)
+@Ignore("Stress test should not be run in embedded mode")
public class GraphManagerStressTest {
private static final Logger log = LoggerFactory.getLogger( GraphManagerStressTest.class );
[08/15] git commit: Finished updating back to executor pool
Posted by to...@apache.org.
Finished updating back to executor pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/10604788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/10604788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/10604788
Branch: refs/heads/two-dot-o
Commit: 1060478811e928c45b15ff5b96cd4b9888671d77
Parents: af3726b
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 09:20:26 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 09:20:26 2014 -0600
----------------------------------------------------------------------
.../impl/EntityVersionCleanupTask.java | 81 ++++++++++----------
.../impl/EntityVersionCleanupTaskTest.java | 55 +++++++------
.../core/task/NamedTaskExecutorImpl.java | 20 +++--
.../usergrid/persistence/core/task/Task.java | 22 +-----
.../core/task/NamedTaskExecutorImplTest.java | 12 +--
.../shard/impl/ShardGroupCompactionImpl.java | 40 ++--------
6 files changed, 96 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 3d1483d..d7ece40 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -2,10 +2,8 @@ package org.apache.usergrid.persistence.collection.impl;
import java.util.List;
-import java.util.Stack;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,12 +18,17 @@ import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIte
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
/**
* Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
* retained, the range is exclusive.
*/
-public class EntityVersionCleanupTask extends Task<Void> {
+public class EntityVersionCleanupTask implements Task<Void> {
private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
@@ -66,21 +69,23 @@ public class EntityVersionCleanupTask extends Task<Void> {
@Override
- public void rejected() {
+ public Void rejected() {
//Our task was rejected meaning our queue was full. We need this operation to run,
// so we'll run it in our current thread
try {
- executeTask();
+ call();
}
catch ( Exception e ) {
throw new RuntimeException( "Exception thrown in call task", e );
}
+
+ return null;
}
@Override
- public Void executeTask() throws Exception {
+ public Void call() throws Exception {
final UUID maxVersion = version;
@@ -117,54 +122,46 @@ public class EntityVersionCleanupTask extends Task<Void> {
private void fireEvents() throws ExecutionException, InterruptedException {
- if ( listeners.size() == 0 ) {
+ final int listenerSize = listeners.size();
+
+ if ( listenerSize == 0 ) {
return;
}
- //stack to track forked tasks
- final Stack<RecursiveTask<Void>> tasks = new Stack<>();
-
-
- //we don't want to fork the final listener, we'll run that in our current thread
- final int forkedTaskSize = listeners.size() - 1;
-
-
- //execute all the listeners
- for ( int i = 0; i < forkedTaskSize; i++ ) {
-
- final EntityVersionDeleted listener = listeners.get( i );
-
- final RecursiveTask<Void> task = createTask( listener );
-
- task.fork();
-
- tasks.push( task );
+ if ( listenerSize == 1 ) {
+ listeners.get( 0 ).versionDeleted( scope, entityId, version );
+ return;
}
+ LOG.debug( "Started firing {} listeners", listenerSize );
- final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize ) );
+ //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
+ Observable.from( listeners )
+ .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
- lastTask.invoke();
+ @Override
+ public Observable<EntityVersionDeleted> call(
+ final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
+ return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
+ @Override
+ public void call( final EntityVersionDeleted listener ) {
+ listener.versionDeleted( scope, entityId, version );
+ }
+ } );
+ }
+ }, Schedulers.io() ).toBlocking().last();
- //wait for them to complete
- while ( !tasks.isEmpty() ) {
- tasks.pop().get();
- }
+ LOG.debug( "Finished firing {} listeners", listenerSize );
}
- /**
- * Return the new task to execute
- */
- private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
- return new RecursiveTask<Void>() {
- @Override
- protected Void compute() {
- listener.versionDeleted( scope, entityId, version );
- return null;
- }
- };
+ private static interface ListenerRunner {
+
+ /**
+ * Run the listeners
+ */
+ public void runListeners();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 2df75f1..a34b4f8 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection.impl;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.junit.AfterClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -40,6 +42,7 @@ import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import com.google.common.util.concurrent.ListenableFuture;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -57,7 +60,7 @@ import static org.mockito.Mockito.when;
*/
public class EntityVersionCleanupTaskTest {
- private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4 );
+ private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
@AfterClass
@@ -119,10 +122,10 @@ public class EntityVersionCleanupTaskTest {
//start the task
- taskExecutor.submit( cleanupTask );
+ ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
//wait for the task
- cleanupTask.get();
+ future.get();
//verify it was run
verify( firstBatch ).execute();
@@ -187,10 +190,10 @@ public class EntityVersionCleanupTaskTest {
//start the task
- taskExecutor.submit( cleanupTask );
+ ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
//wait for the task
- cleanupTask.get();
+ future.get();
//verify it was run
verify( firstBatch, never() ).execute();
@@ -260,10 +263,10 @@ public class EntityVersionCleanupTaskTest {
//start the task
- taskExecutor.submit( cleanupTask );
+ ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
//wait for the task
- cleanupTask.get();
+ future.get();
//we deleted the version
//verify it was run
@@ -343,10 +346,10 @@ public class EntityVersionCleanupTaskTest {
//start the task
- taskExecutor.submit( cleanupTask );
+ ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
//wait for the task
- cleanupTask.get();
+ future.get();
//we deleted the version
//verify we deleted everything
@@ -440,18 +443,18 @@ public class EntityVersionCleanupTaskTest {
//start the task
- taskExecutor.submit( cleanupTask );
+ ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
/**
* While we're not done, release latches every 200 ms
*/
- while(!cleanupTask.isDone()) {
+ while(!future.isDone()) {
Thread.sleep( 200 );
waitSemaphore.release( listenerCount );
}
//wait for the task
- cleanupTask.get();
+ future.get();
//we deleted the version
//verify we deleted everything
@@ -479,7 +482,7 @@ public class EntityVersionCleanupTaskTest {
/**
* only 1 thread on purpose, we want to saturate the task
*/
- final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
final SerializationFig serializationFig = mock( SerializationFig.class );
@@ -496,17 +499,16 @@ public class EntityVersionCleanupTaskTest {
final int sizeToReturn = 10;
- final int listenerCount = 1;
+ final int listenerCount = 2;
final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
final Semaphore waitSemaphore = new Semaphore( 0 );
- final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+ final SlowListener slowListener = new SlowListener( latch, waitSemaphore );
+ final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
- final List<EntityVersionDeleted> listeners = new ArrayList<>();
- listeners.add( listener1 );
final Id applicationId = new SimpleId( "application" );
@@ -526,11 +528,16 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask firstTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener), appScope, entityId, version );
+
+
+
+ //change the listeners to one that is just invoked quickly
+
EntityVersionCleanupTask secondTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener), appScope, entityId, version );
final MutationBatch firstBatch = mock( MutationBatch.class );
@@ -548,24 +555,24 @@ public class EntityVersionCleanupTaskTest {
//start the task
- taskExecutor.submit( firstTask );
+ ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
//now start another task while the slow running task is running
- taskExecutor.submit( secondTask );
+ ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
//get the second task, we shouldn't have been able to queue it, therefore it should just run in process
- secondTask.get();
+ future2.get();
/**
* While we're not done, release latches every 200 ms
*/
- while(!firstTask.isDone()) {
+ while(!future1.isDone()) {
Thread.sleep( 200 );
waitSemaphore.release( listenerCount );
}
//wait for the task
- firstTask.get();
+ future1.get();
//we deleted the version
//verify we deleted everything
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index d4cc915..b18687a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,6 +1,7 @@
package org.apache.usergrid.persistence.core.task;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
@@ -61,7 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
- public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+ public <V> ListenableFuture<V> submit( final Task<V> task ) {
final ListenableFuture<V> future;
@@ -71,26 +72,31 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* Log our success or failures for debugging purposes
*/
- Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+ Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
}
catch ( RejectedExecutionException ree ) {
- task.rejected();
- return Futures.immediateCancelledFuture();
+ return Futures.immediateFuture( task.rejected());
}
return future;
}
+ @Override
+ public void shutdown() {
+ this.executorService.shutdownNow();
+ }
+
+
/**
* Callback for when the task succeeds or fails.
*/
- private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+ private static final class TaskFutureCallBack<V> implements FutureCallback<V> {
- private final Task<V, I> task;
+ private final Task<V> task;
- private TaskFutureCallBack( Task<V, I> task ) {
+ private TaskFutureCallBack( Task<V> task ) {
this.task = task;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 9a0b857..5890627 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,30 +1,14 @@
package org.apache.usergrid.persistence.core.task;
+import java.util.concurrent.Callable;
import java.util.concurrent.RecursiveTask;
/**
* The task to execute
*/
-public interface Task<V, I> extends Callable<V> {
-
- /**
- * Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
- */
- public abstract I getId();
-
-
- @Override
- protected V compute() {
- try {
- return executeTask();
- }
- catch ( Exception e ) {
- exceptionThrown( e );
- throw new RuntimeException( e );
- }
- }
+public interface Task<V> extends Callable<V> {
/**
@@ -40,7 +24,7 @@ public interface Task<V, I> extends Callable<V> {
* request and process later (lazy repair for instance ) do so.
*
*/
- void rejected();
+ V rejected();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index fa63eee..34f57e5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+ final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
executor.submit( task );
@@ -185,7 +185,7 @@ public class NamedTaskExecutorImplTest {
}
- private static abstract class TestTask<V> implements Task<V, UUID> {
+ private static abstract class TestTask<V> implements Task<V> {
private final List<Throwable> exceptions;
private final CountDownLatch exceptionLatch;
@@ -203,11 +203,6 @@ public class NamedTaskExecutorImplTest {
}
- @Override
- public UUID getId() {
- return UUIDGenerator.newTimeUUID();
- }
-
@Override
public void exceptionThrown( final Throwable throwable ) {
@@ -217,8 +212,9 @@ public class NamedTaskExecutorImplTest {
@Override
- public void rejected() {
+ public V rejected() {
rejectedLatch.countDown();
+ return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..6cd98a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -315,7 +315,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
+ private final class ShardAuditTask implements Task<AuditResult> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
@@ -329,23 +329,18 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.group = group;
}
-
- @Override
- public ShardAuditKey getId() {
- return new ShardAuditKey( scope, edgeMeta, group );
- }
-
-
- @Override
+ @Override
public void exceptionThrown( final Throwable throwable ) {
LOG.error( "Unable to execute audit for shard of {}", throwable );
}
@Override
- public void rejected() {
+ public AuditResult rejected() {
//ignore, if this happens we don't care, we're saturated, we can check later
- LOG.error( "Rejected audit for shard of {}", getId() );
+ LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group );
+
+ return AuditResult.NOT_CHECKED;
}
@@ -418,29 +413,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- private static final class ShardAuditKey {
- private final ApplicationScope scope;
- private final DirectedEdgeMeta edgeMeta;
- private final ShardEntryGroup group;
-
-
- private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group ) {
- this.scope = scope;
- this.edgeMeta = edgeMeta;
- this.group = group;
- }
-
-
- @Override
- public String toString() {
- return "ShardAuditKey{" +
- "scope=" + scope +
- ", edgeMeta=" + edgeMeta +
- ", group=" + group +
- '}';
- }
- }
/**
[14/15] git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into eventsystem
Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into eventsystem
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d85b97c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d85b97c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d85b97c4
Branch: refs/heads/two-dot-o
Commit: d85b97c423f31afda0e846a9071e03efe0505d26
Parents: 8cadba2 abbd76e
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:30:54 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:30:54 2014 -0600
----------------------------------------------------------------------
.../usergrid/mq/cassandra/QueueManagerImpl.java | 18 +-
.../mq/cassandra/io/AbstractSearch.java | 199 ++++++++++++-------
.../mq/cassandra/io/ConsumerTransaction.java | 19 +-
.../services/notifications/QueueListener.java | 4 +-
4 files changed, 153 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
[12/15] git commit: Merge branch 'two-dot-o' into eventsystem
Posted by to...@apache.org.
Merge branch 'two-dot-o' into eventsystem
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1a416245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1a416245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1a416245
Branch: refs/heads/two-dot-o
Commit: 1a4162459ae8c31a489937d8a5d2f76f5ca6c2e2
Parents: 429b12b d6fd7dd
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 10:25:44 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 10:25:44 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 6 ++--
.../index/impl/EsEntityIndexImpl.java | 13 ++++++++
.../persistence/index/impl/EsQueryVistor.java | 3 +-
.../java/org/apache/usergrid/rest/BasicIT.java | 34 ++++++++++++++------
.../notifications/ApplicationQueueManager.java | 2 +-
.../services/notifications/QueueListener.java | 6 ++--
6 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
[03/15] git commit: Added events and task cleanup
Posted by to...@apache.org.
Added events and task cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c0b78b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c0b78b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c0b78b9f
Branch: refs/heads/two-dot-o
Commit: c0b78b9fab07cbc2f5975d89664ff8a523b98b9a
Parents: 4c72f5c
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 12:09:11 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 12:09:11 2014 -0600
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 12 +-
.../collection/event/EntityDeleted.java | 25 ++++
.../collection/event/EntityVersionCreated.java | 25 ++++
.../collection/event/EntityVersionDeleted.java | 28 +++++
.../collection/event/EntityVersionRemoved.java | 26 -----
.../impl/EntityCollectionManagerImpl.java | 12 ++
.../impl/EntityVersionCleanupTask.java | 115 +++++++++++++++++++
.../serialization/impl/VersionIterator.java | 111 ++++++++++++++++++
.../core/task/NamedTaskExecutorImpl.java | 26 +++--
.../usergrid/persistence/core/task/Task.java | 2 +-
.../persistence/core/task/TaskExecutor.java | 5 +-
.../persistence/graph/event/EdgeDeleted.java | 8 ++
12 files changed, 349 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 84bcea6..ee3a5d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -27,14 +27,14 @@ import rx.Observable;
/**
*
- *
- * @author: tnine
+ * The operations for performing changes on an entity
*
*/
public interface EntityCollectionManager {
/**
- * Write the entity in the entity collection.
+ * Write the entity in the entity collection. This is an entire entity, it's contents will
+ * completely overwrite the previous values, if it exists.
*
* @param entity The entity to update
*/
@@ -52,10 +52,10 @@ public interface EntityCollectionManager {
public Observable<Entity> load( Id entityId );
- //TODO add partial update
-
/**
- * Takes the change and reloads an entity with all changes applied.
+ * Takes the change and reloads an entity with all changes applied in this entity applied.
+ * The resulting entity from calling load will be the previous version of this entity + the entity
+ * in this object applied to it.
* @param entity
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
new file mode 100644
index 0000000..1c075f1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity is deleted. The delete log entry is not removed until all instances of this listener has completed.
+ * If any listener fails with an exception, the entity will not be removed.
+ *
+ */
+public interface EntityDeleted {
+
+
+ /**
+ * The event fired when an entity is deleted
+ *
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ */
+ public void deleted( final CollectionScope scope, final Id entityId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
new file mode 100644
index 0000000..9d7761c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ *
+ * Invoked after a new version of an entity has been created. The entity should be a complete
+ * view of the entity.
+ *
+ */
+public interface EntityVersionCreated {
+
+
+ /**
+ * The new version of the entity. Note that this should be a fully merged view of the entity.
+ * In the case of partial updates, the passed entity should be fully merged with it's previous entries
+ * @param scope The scope of the entity
+ * @param entity The fully loaded and merged entity
+ */
+ public void versionCreated( final CollectionScope scope, final Entity entity );
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
new file mode 100644
index 0000000..3b76e84
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -0,0 +1,28 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionDeleted {
+
+
+ /**
+ * The version specified was removed.
+ *
+ * @param scope The scope of the entity
+ * @param entityId The entity Id that was removed
+ * @param entityVersion The version that was removed
+ */
+ public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
deleted file mode 100644
index dca575d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-
-
-/**
- *
- * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
- * only the version itself.
- *
- */
-public interface EntityVersionRemoved {
-
-
- /**
- * The version specified was removed.
- * @param scope
- * @param entityId
- * @param entityVersion
- */
- public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 96a7ed2..a6fc31b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -19,11 +19,16 @@
package org.apache.usergrid.persistence.collection.impl;
+import java.util.List;
+import java.util.UUID;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.Write;
import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -38,6 +43,8 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimist
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -83,6 +90,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final MarkStart markStart;
private final MarkCommit markCommit;
+ private final TaskExecutor taskExecutor;
+
@Inject
public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
@WriteUpdate final WriteStart writeUpdate,
@@ -90,6 +99,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
final MarkStart markStart, final MarkCommit markCommit,
+ final TaskExecutor taskExecutor,
@Assisted final CollectionScope collectionScope) {
Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
@@ -109,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.uuidService = uuidService;
this.collectionScope = collectionScope;
+ this.taskExecutor = taskExecutor;
}
@@ -240,4 +251,5 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
new file mode 100644
index 0000000..11e2da9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -0,0 +1,115 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.core.entity.EntityVersion;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
+ * retained, the range is exclusive.
+ */
+class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
+
+
+ private final CollectionIoEvent<EntityVersion> collectionIoEvent;
+ private final List<EntityVersionDeleted> listeners;
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+ private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final CollectionIoEvent<EntityVersion> collectionIoEvent,
+ final List<EntityVersionDeleted> listeners ) {
+ this.collectionIoEvent = collectionIoEvent;
+ this.listeners = listeners;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ }
+
+
+ @Override
+ public CollectionIoEvent<EntityVersion> getId() {
+ return collectionIoEvent;
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+ }
+
+
+ @Override
+ public void rejected() {
+ //Our task was rejected meaning our queue was full. We need this operation to run,
+ // so we'll run it in our current thread
+
+ try {
+ call();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Exception thrown in call task", e );
+ }
+ }
+
+
+ @Override
+ public CollectionIoEvent<EntityVersion> call() throws Exception {
+
+ final CollectionScope scope = collectionIoEvent.getEntityCollection();
+ final Id entityId = collectionIoEvent.getEvent().getId();
+ final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+
+ VersionIterator versionIterator =
+ new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 );
+
+
+ UUID currentVersion = null;
+
+ //for every entry, we want to clean it up with listeners
+
+ while ( versionIterator.hasNext() ) {
+
+ currentVersion = versionIterator.next();
+
+
+ //execute all the listeners
+ for ( EntityVersionDeleted listener : listeners ) {
+ listener.versionDeleted( scope, entityId, currentVersion );
+ }
+
+ //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
+ //after every successful invocation of listeners and entity removal
+ entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+ logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+
+ }
+
+
+ return collectionIoEvent;
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
new file mode 100644
index 0000000..323f12d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ *
+ */
+public class VersionIterator implements Iterator<UUID> {
+
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final CollectionScope scope;
+ private final Id entityId;
+ private final int pageSize;
+
+
+ private Iterator<MvccLogEntry> elementItr;
+ private UUID nextStart;
+
+
+ /**
+ *
+ * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max
+ * @param pageSize The fetch size to get when querying the serialization strategy
+ */
+ public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final CollectionScope scope, final Id entityId, final UUID maxVersion,
+ final int pageSize ) {
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.nextStart = maxVersion;
+ this.pageSize = pageSize;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+ try {
+ advance();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to query cassandra", e );
+ }
+ }
+
+ return elementItr.hasNext();
+ }
+
+
+ @Override
+ public UUID next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist" );
+ }
+
+ return elementItr.next().getVersion();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator
+ */
+ public void advance() throws ConnectionException {
+
+ final int requestedSize = pageSize + 1;
+
+ //loop through even entry that's < this one and remove it
+ List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+ //we always remove the first version if it's equal since it's returned
+ if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ results.remove( 0 );
+ }
+
+
+ //we have results, set our next start
+ if ( results.size() == requestedSize ) {
+ nextStart = results.get( results.size() - 1 ).getVersion();
+ }
+ //nothing left to do
+ else {
+ nextStart = null;
+ }
+
+ elementItr = results.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8184937..40ee5cf 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -32,6 +32,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
private final ListeningExecutorService executorService;
+ private final String name;
+ private final int poolSize;
+ private final int queueLength;
+
/**
* @param name The name of this instance of the task executor
@@ -44,11 +48,14 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
+ this.name = name;
+ this.poolSize = poolSize;
+ this.queueLength = queueLength;
final BlockingQueue<Runnable> queue =
queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+ executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
}
@@ -105,11 +112,11 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
- private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+ private final class MaxSizeThreadPool extends ThreadPoolExecutor {
- public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+ public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
- super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+ super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
new RejectedHandler() );
}
}
@@ -118,15 +125,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* Thread factory that will name and count threads for easier debugging
*/
- private static final class CountingThreadFactory implements ThreadFactory {
+ private final class CountingThreadFactory implements ThreadFactory {
private final AtomicLong threadCounter = new AtomicLong();
- private final String name;
-
-
- private CountingThreadFactory( final String name ) {this.name = name;}
-
@Override
public Thread newThread( final Runnable r ) {
@@ -144,12 +146,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* The handler that will handle rejected executions and signal the interface
*/
- private static final class RejectedHandler implements RejectedExecutionHandler {
+ private final class RejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "Audit queue full, rejecting audit task {}", r );
+ LOG.warn( "{} task queue full, rejecting task {}", name, r );
throw new RejectedExecutionException( "Unable to run task, queue full" );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 8b1ed22..518b461 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -27,7 +27,7 @@ public interface Task<V, I> extends Callable<V> {
* Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
* If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
* task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
- * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+ * request and process later (lazy repair for instance ) do so.
*
*/
void rejected();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index b5491bc..619ac14 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -1,6 +1,9 @@
package org.apache.usergrid.persistence.core.task;
+import com.google.common.util.concurrent.ListenableFuture;
+
+
/**
* An interface for execution of tasks
*/
@@ -10,5 +13,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
+ public <V, I> ListenableFuture<V> submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
new file mode 100644
index 0000000..631de59
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
@@ -0,0 +1,8 @@
+package org.apache.usergrid.persistence.graph.event;
+
+
+/**
+ *
+ *
+ */
+public interface EdgeDeleted {}
[11/15] git commit: Resolved conflicts when guice modules are
integrated
Posted by to...@apache.org.
Resolved conflicts when guice modules are integrated
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/429b12b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/429b12b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/429b12b4
Branch: refs/heads/two-dot-o
Commit: 429b12b4f5da4af4e8ce868f0042897b1f73a52a
Parents: 324a333
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 18:06:43 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 18:06:43 2014 -0600
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 1 +
.../guice/CollectionTaskExecutor.java | 35 ++++++++++++++++++++
.../impl/EntityCollectionManagerImpl.java | 2 ++
.../persistence/graph/guice/GraphModule.java | 1 +
.../graph/guice/GraphTaskExecutor.java | 33 ++++++++++++++++++
.../impl/shard/impl/NodeShardCacheImpl.java | 5 ---
.../shard/impl/ShardGroupCompactionImpl.java | 3 +-
7 files changed, 74 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 306f6e0..9d949e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -100,6 +100,7 @@ public class CollectionModule extends AbstractModule {
@Inject
@Singleton
@Provides
+ @CollectionTaskExecutor
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
new file mode 100644
index 0000000..7c08437
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
@@ -0,0 +1,35 @@
+package org.apache.usergrid.persistence.collection.guice;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CollectionTaskExecutor {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index a6fc31b..094baa6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -29,6 +29,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.collection.guice.Write;
import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -99,6 +100,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
final MarkStart markStart, final MarkCommit markCommit,
+ @CollectionTaskExecutor
final TaskExecutor taskExecutor,
@Assisted final CollectionScope collectionScope) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 608f8ce..9538be0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -154,6 +154,7 @@ public class GraphModule extends AbstractModule {
@Inject
@Singleton
@Provides
+ @GraphTaskExecutor
public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java
new file mode 100644
index 0000000..fc54598
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java
@@ -0,0 +1,33 @@
+package org.apache.usergrid.persistence.graph.guice;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface GraphTaskExecutor {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index a2c1aeb..c5f0bfb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -62,11 +62,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
-import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 6cd98a7..9f8efc8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -50,6 +50,7 @@ import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.guice.GraphTaskExecutor;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -110,7 +111,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
final EdgeShardSerialization edgeShardSerialization,
- final TaskExecutor taskExecutor ) {
+ @GraphTaskExecutor final TaskExecutor taskExecutor ) {
this.timeService = timeService;
this.graphFig = graphFig;
[04/15] git commit: Converted tasks to ForkJoin tasks to allow for
parallel execution.
Posted by to...@apache.org.
Converted tasks to ForkJoin tasks to allow for parallel execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dc3f448c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dc3f448c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dc3f448c
Branch: refs/heads/two-dot-o
Commit: dc3f448c946219c44e3dbfadb294fea0b8048343
Parents: c0b78b9
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 14:56:23 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 14:56:23 2014 -0600
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 2 +-
.../impl/EntityVersionCleanupTask.java | 8 +-
.../persistence/core/task/ImmediateTask.java | 42 +++++++
.../core/task/NamedTaskExecutorImpl.java | 74 ++++++++-----
.../usergrid/persistence/core/task/Task.java | 44 +++++---
.../persistence/core/task/TaskExecutor.java | 2 +-
.../core/task/NamedTaskExecutorImplTest.java | 111 +++++++++++++++----
.../persistence/graph/guice/GraphModule.java | 2 +-
.../impl/shard/ShardGroupCompaction.java | 7 +-
.../shard/impl/ShardGroupCompactionImpl.java | 44 ++------
10 files changed, 227 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 306f6e0..a6230e1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Provides
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 11e2da9..85afce6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -22,7 +22,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
* Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
* retained, the range is exclusive.
*/
-class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
@@ -63,7 +63,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
// so we'll run it in our current thread
try {
- call();
+ executeTask();
}
catch ( Exception e ) {
throw new RuntimeException( "Exception thrown in call task", e );
@@ -72,7 +72,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
@Override
- public CollectionIoEvent<EntityVersion> call() throws Exception {
+ public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
final CollectionScope scope = collectionIoEvent.getEntityCollection();
final Id entityId = collectionIoEvent.getEvent().getId();
@@ -102,8 +102,6 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
new file mode 100644
index 0000000..627e7e8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -0,0 +1,42 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * Does not perform computation, just returns the value passed to it
+ *
+ */
+public class ImmediateTask<V, I> extends Task<V, I> {
+
+ private final I id;
+ private final V returned;
+
+
+ protected ImmediateTask( final I id, final V returned ) {
+ this.id = id;
+ this.returned = returned;
+ }
+
+
+ @Override
+ public I getId() {
+ return id;
+ }
+
+
+ @Override
+ public V executeTask() throws Exception {
+ return returned;
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ //no op
+ }
+
+
+ @Override
+ public void rejected() {
+ //no op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 40ee5cf..aba8cd5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,11 +1,11 @@
package org.apache.usergrid.persistence.core.task;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -17,10 +17,6 @@ import org.slf4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
/**
@@ -30,54 +26,45 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
- private final ListeningExecutorService executorService;
+ private final NamedForkJoinPool executorService;
private final String name;
private final int poolSize;
- private final int queueLength;
/**
* @param name The name of this instance of the task executor
* @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
- * @param queueLength The length of tasks to keep in the queue
*/
- public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+ public NamedTaskExecutorImpl( final String name, final int poolSize) {
Preconditions.checkNotNull( name );
Preconditions.checkArgument( name.length() > 0, "name must have a length" );
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
- Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
this.name = name;
this.poolSize = poolSize;
- this.queueLength = queueLength;
- final BlockingQueue<Runnable> queue =
- queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+// final BlockingQueue<Runnable> queue =
+// queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+//
+// executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+ this.executorService = new NamedForkJoinPool(poolSize);
}
@Override
- public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
-
- final ListenableFuture<V> future;
+ public <V, I> Task<V, I> submit( final Task<V, I> task ) {
try {
- future = executorService.submit( task );
-
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+ executorService.submit( task );
}
catch ( RejectedExecutionException ree ) {
task.rejected();
- return Futures.immediateCancelledFuture();
}
- return future;
+ return task;
+
}
@@ -109,6 +96,41 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
+ private final class NamedForkJoinPool extends ForkJoinPool{
+
+ private NamedForkJoinPool( final int workerThreadCount ) {
+ //TODO, verify the scheduler at the end
+ super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+ }
+
+
+
+ }
+
+ private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+ @Override
+ public void uncaughtException( final Thread t, final Throwable e ) {
+ LOG.error( "Uncaught exception on thread {} was {}", t, e );
+ }
+ }
+
+
+
+
+ private final class NamedWorkerThread extends ForkJoinWorkerThread{
+
+ /**
+ * Creates a ForkJoinWorkerThread operating in the given pool.
+ *
+ * @param pool the pool this thread works in
+ *
+ * @throws NullPointerException if pool is null
+ */
+ protected NamedWorkerThread(final String name, final ForkJoinPool pool ) {
+ super( pool );
+ }
+ }
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 518b461..eb04c2c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,37 +1,47 @@
package org.apache.usergrid.persistence.core.task;
-import java.util.concurrent.Callable;
-
+import java.util.concurrent.RecursiveTask;
/**
* The task to execute
*/
-public interface Task<V, I> extends Callable<V> {
+public abstract class Task<V, I> extends RecursiveTask<V> {
/**
* Get the unique identifier of this task. This may be used to collapse runnables over a time period in the future
- *
- * @return
*/
- I getId();
+ public abstract I getId();
+
+
+ @Override
+ protected V compute() {
+ try {
+ return executeTask();
+ }
+ catch ( Exception e ) {
+ exceptionThrown( e );
+ throw new RuntimeException( e );
+ }
+ }
+
/**
- * Invoked when this task throws an uncaught exception.
- * @param throwable
+ * Execute the task
*/
- void exceptionThrown(final Throwable throwable);
+ public abstract V executeTask() throws Exception;
/**
- * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
- * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
- * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
- * request and process later (lazy repair for instance ) do so.
- *
+ * Invoked when this task throws an uncaught exception.
*/
- void rejected();
-
-
+ public abstract void exceptionThrown( final Throwable throwable );
+ /**
+ * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
+ * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
+ * scheduling thread. Note that this has performance implications to the user. If you can drop the request and
+ * process later (lazy repair for instance ) do so.
+ */
+ public abstract void rejected();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 619ac14..a3553c7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> ListenableFuture<V> submit( Task<V, I> task );
+ public <V, I> Task<V, I > submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9da9263..f65d5a6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -11,8 +11,6 @@ import org.junit.Test;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import junit.framework.TestCase;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
@@ -25,7 +23,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void jobSuccess() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -46,7 +44,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void exceptionThrown() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -55,9 +53,10 @@ public class NamedTaskExecutorImplTest {
final RuntimeException re = new RuntimeException( "throwing exception" );
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+
+
@Override
- public Void call() throws Exception {
- super.call();
+ public Void executeTask() {
throw re;
}
};
@@ -77,7 +76,7 @@ public class NamedTaskExecutorImplTest {
@Test
public void noCapacity() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -86,8 +85,8 @@ public class NamedTaskExecutorImplTest {
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
@Override
- public Void call() throws Exception {
- super.call();
+ public Void executeTask() throws Exception {
+ super.executeTask();
//park this thread so it takes up a task and the next is rejected
final Object mutex = new Object();
@@ -131,22 +130,22 @@ public class NamedTaskExecutorImplTest {
public void noCapacityWithQueue() throws InterruptedException {
final int threadPoolSize = 1;
- final int queueSize = 10;
+
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
final CountDownLatch runLatch = new CountDownLatch( 1 );
- int iterations = threadPoolSize + queueSize;
+ int iterations = threadPoolSize ;
- for(int i = 0; i < iterations; i ++) {
+ for ( int i = 0; i < iterations; i++ ) {
final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
@Override
- public Void call() throws Exception {
- super.call();
+ public Void executeTask() throws Exception {
+ super.executeTask();
//park this thread so it takes up a task and the next is rejected
final Object mutex = new Object();
@@ -162,7 +161,6 @@ public class NamedTaskExecutorImplTest {
}
-
runLatch.await( 1000, TimeUnit.MILLISECONDS );
//now submit the second task
@@ -187,12 +185,81 @@ public class NamedTaskExecutorImplTest {
}
- private static abstract class TestTask<V> implements Task<V, UUID> {
+ @Test
+ public void jobTreeResult() throws InterruptedException {
+
+ final int threadPoolSize = 4;
+
+
+ final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+
+ final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+ final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+
+ //accomodates for splitting the job 1->2->4 and joining
+ final CountDownLatch runLatch = new CountDownLatch( 7 );
+
+
+ TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+
+ executor.submit( task );
+
+
+ //compute our result
+ Integer result = task.join();
+
+ //result should be 1+2*2+3*4
+ final int expected = 4*3;
+
+ assertEquals(expected, result.intValue());
+
+ //just to check our latches
+ runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+ //now submit the second task
+
+
+ }
+
+
+ private static class TestRecursiveTask extends TestTask<Integer> {
+
+ private final int depth;
+ private final int maxDepth;
+
+ private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+ final CountDownLatch runLatch, final int depth, final int maxDepth ) {
+ super( exceptionLatch, rejectedLatch, runLatch );
+ this.depth = depth;
+ this.maxDepth = maxDepth;
+ }
+
+
+ @Override
+ public Integer executeTask() throws Exception {
+
+ if(depth == maxDepth ){
+ return depth;
+ }
+
+ TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth );
+
+ TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth );
+
+ //run our left in another thread
+ left.fork();
+
+ return right.compute() + left.join();
+ }
+ }
+
+
+ private static abstract class TestTask<V> extends Task<V, UUID> {
- private final List<Throwable> exceptions;
- private final CountDownLatch exceptionLatch;
- private final CountDownLatch rejectedLatch;
- private final CountDownLatch runLatch;
+ protected final List<Throwable> exceptions;
+ protected final CountDownLatch exceptionLatch;
+ protected final CountDownLatch rejectedLatch;
+ protected final CountDownLatch runLatch;
private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -225,7 +292,7 @@ public class NamedTaskExecutorImplTest {
@Override
- public V call() throws Exception {
+ public V executeTask() throws Exception {
runLatch.countDown();
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 608f8ce..a3978fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
@Singleton
@Provides
public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
- return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
+ return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..15acaa8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
import com.google.common.util.concurrent.ListenableFuture;
@@ -42,9 +44,8 @@ public interface ShardGroupCompaction {
*
* @return A ListenableFuture with the result. Note that some
*/
- public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
- final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group );
+ public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+ final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
public enum AuditResult {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..751b4d9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,14 +37,13 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.ImmediateTask;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -66,9 +65,6 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -277,45 +273,29 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
- final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group ) {
+ public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
final double repairChance = random.nextDouble();
//don't audit, we didn't hit our chance
if ( repairChance > graphFig.getShardRepairChance() ) {
- return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+ return new ImmediateTask<AuditResult, ShardAuditKey>( new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
}
/**
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
- ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+ return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new FutureCallback<AuditResult>() {
- @Override
- public void onSuccess( @Nullable final AuditResult result ) {
- LOG.debug( "Successfully completed audit of task {}", result );
- }
-
-
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to perform audit. Exception is ", t );
- }
- } );
- return future;
}
- private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
+ private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
@@ -350,7 +330,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
@Override
- public AuditResult call() throws Exception {
+ public AuditResult executeTask() throws Exception {
/**
* We don't have a compaction pending. Run an audit on the shards
*/
@@ -401,10 +381,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
*/
try {
CompactionResult result = compact( scope, edgeMeta, group );
- LOG.info(
- "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
- "{} is {}",
- new Object[] { scope, edgeMeta, group, result } );
+ LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
+ + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
}
finally {
shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -418,7 +396,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- private static final class ShardAuditKey {
+ public static final class ShardAuditKey {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
private final ShardEntryGroup group;
[15/15] git commit: Merge branch 'eventsystem' into two-dot-o
Posted by to...@apache.org.
Merge branch 'eventsystem' into two-dot-o
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2678eae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2678eae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2678eae9
Branch: refs/heads/two-dot-o
Commit: 2678eae9e7b61911d02c60394ae2035fb4a83955
Parents: abbd76e d85b97c
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:31:56 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:31:56 2014 -0600
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 12 +-
.../collection/event/EntityDeleted.java | 25 +
.../collection/event/EntityVersionCreated.java | 25 +
.../collection/event/EntityVersionDeleted.java | 29 +
.../collection/guice/CollectionModule.java | 18 +-
.../guice/CollectionTaskExecutor.java | 35 +
.../impl/EntityCollectionManagerImpl.java | 14 +
.../impl/EntityVersionCleanupTask.java | 198 ++++++
.../serialization/SerializationFig.java | 32 +-
.../serialization/impl/LogEntryIterator.java | 114 +++
.../EntityCollectionManagerStressTest.java | 2 +
.../impl/EntityVersionCleanupTaskTest.java | 690 +++++++++++++++++++
.../impl/LogEntryIteratorTest.java | 131 ++++
.../collection/util/LogEntryMock.java | 152 ++++
.../core/astyanax/AstyanaxKeyspaceProvider.java | 2 +
.../persistence/core/guice/CommonModule.java | 2 +
.../core/task/NamedTaskExecutorImpl.java | 167 +++++
.../usergrid/persistence/core/task/Task.java | 31 +
.../persistence/core/task/TaskExecutor.java | 23 +
.../core/task/NamedTaskExecutorImplTest.java | 227 ++++++
.../usergrid/persistence/graph/GraphFig.java | 2 +
.../persistence/graph/event/EdgeDeleted.java | 8 +
.../persistence/graph/guice/GraphModule.java | 16 +
.../graph/guice/GraphTaskExecutor.java | 33 +
.../impl/shard/impl/NodeShardCacheImpl.java | 5 -
.../shard/impl/ShardGroupCompactionImpl.java | 175 +++--
.../graph/GraphManagerStressTest.java | 1 +
.../impl/shard/ShardGroupCompactionTest.java | 5 +-
28 files changed, 2081 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
[09/15] git commit: Merge branch 'two-dot-o' into eventsystem
Posted by to...@apache.org.
Merge branch 'two-dot-o' into eventsystem
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9c63ce63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9c63ce63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9c63ce63
Branch: refs/heads/two-dot-o
Commit: 9c63ce63fd23445d46d0371d8728b17e276bde12
Parents: 1060478 9e2743d
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 14:52:25 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 14:52:25 2014 -0600
----------------------------------------------------------------------
portal/js/global/ug-service.js | 1100 +++--
portal/js/libs/usergrid.sdk.js | 4264 +++++++++---------
stack/core/pom.xml | 10 -
.../corepersistence/CpEntityManager.java | 1 +
.../corepersistence/CpEntityManagerFactory.java | 184 +-
.../corepersistence/CpRelationManager.java | 58 +-
.../HybridEntityManagerFactory.java | 10 +
.../mq/cassandra/io/AbstractSearch.java | 17 +-
.../persistence/EntityManagerFactory.java | 4 +
.../cassandra/EntityManagerFactoryImpl.java | 51 +
.../persistence/PerformanceEntityReadTest.java | 2 +
.../persistence/PerformanceEntityWriteTest.java | 2 +
.../cassandra/QueryProcessorTest.java | 2 -
stack/core/src/test/resources/log4j.properties | 1 +
.../index/impl/EsEntityIndexImpl.java | 122 +-
.../persistence/index/impl/EsQueryVistor.java | 150 +-
.../index/impl/ElasticSearchTest.java | 274 --
.../impl/EntityConnectionIndexImplTest.java | 2 -
.../index/impl/EntityIndexMapUtils.java | 12 +-
.../persistence/index/impl/EntityIndexTest.java | 77 +-
.../apache/usergrid/rest/AbstractRestIT.java | 6 +-
.../usergrid/rest/ConcurrentRestITSuite.java | 54 +-
.../apache/usergrid/rest/NotificationsIT.java | 162 +-
.../org/apache/usergrid/rest/RestITSuite.java | 45 +-
.../ApplicationRequestCounterIT.java | 97 -
.../rest/applications/DevicesResourceIT.java | 87 -
.../collection/BadGrammarQueryTest.java | 79 -
.../collection/CollectionsResourceIT.java | 205 +
.../collection/PagingResourceIT.java | 283 --
.../activities/ActivityResourceIT.java | 188 +
.../collection/activities/AndOrQueryTest.java | 203 -
.../collection/activities/OrderByTest.java | 172 -
.../activities/PagingEntitiesTest.java | 141 -
.../collection/devices/DevicesResourceIT.java | 87 +
.../collection/groups/GeoPagingTest.java | 133 -
.../collection/groups/GroupResourceIT.java | 295 ++
.../collection/paging/PagingEntitiesTest.java | 141 +
.../collection/paging/PagingResourceIT.java | 301 ++
.../users/ConnectionResourceTest.java | 271 ++
.../collection/users/OwnershipResourceIT.java | 379 ++
.../collection/users/PermissionsResourceIT.java | 768 ++++
.../collection/users/RetrieveUsersTest.java | 87 +
.../collection/users/UserResourceIT.java | 1418 ++++++
.../users/extensions/TestResource.java | 51 +
.../events/ApplicationRequestCounterIT.java | 97 +
.../applications/queries/AndOrQueryTest.java | 203 +
.../queries/BadGrammarQueryTest.java | 79 +
.../applications/queries/GeoPagingTest.java | 133 +
.../applications/queries/MatrixQueryTests.java | 202 +
.../rest/applications/queries/OrderByTest.java | 172 +
.../applications/users/ActivityResourceIT.java | 188 -
.../users/CollectionsResourceIT.java | 205 -
.../users/ConnectionResourceTest.java | 271 --
.../applications/users/GroupResourceIT.java | 295 --
.../applications/users/MatrixQueryTests.java | 202 -
.../applications/users/OwnershipResourceIT.java | 379 --
.../users/PermissionsResourceIT.java | 768 ----
.../applications/users/RetrieveUsersTest.java | 87 -
.../rest/applications/users/UserResourceIT.java | 1418 ------
.../users/extensions/TestResource.java | 51 -
.../usergrid/rest/management/AccessTokenIT.java | 350 ++
.../usergrid/rest/management/AdminUsersIT.java | 807 ++++
.../rest/management/ManagementResourceIT.java | 397 +-
.../rest/management/OrganizationsIT.java | 378 ++
.../organizations/OrganizationResourceIT.java | 90 -
.../organizations/OrganizationsResourceIT.java | 322 --
.../rest/management/users/MUUserResourceIT.java | 654 ---
.../UsersOrganizationsResourceIT.java | 72 -
stack/rest/src/test/resources/log4j.properties | 7 +-
.../notifications/ApplicationQueueManager.java | 40 +-
.../notifications/NotificationsService.java | 7 +-
.../notifications/NotificationsTaskManager.java | 33 -
.../services/notifications/QueueListener.java | 71 +-
.../services/notifications/QueueManager.java | 4 -
.../notifications/SingleQueueTaskManager.java | 63 +-
.../services/notifications/TaskTracker.java | 4 +-
.../resources/usergrid-services-context.xml | 9 +-
.../apns/NotificationsServiceIT.java | 6 +-
.../gcm/NotificationsServiceIT.java | 9 +-
stack/tools/README.md | 18 +
.../org/apache/usergrid/tools/IndexRebuild.java | 99 +-
.../org/apache/usergrid/tools/ToolBase.java | 26 +-
82 files changed, 10302 insertions(+), 9910 deletions(-)
----------------------------------------------------------------------
[13/15] git commit: Refactored listener to take buffers of versions
for efficiency
Posted by to...@apache.org.
Refactored listener to take buffers of versions for efficiency
Refactored task to use RX for observable streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cadba29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cadba29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cadba29
Branch: refs/heads/two-dot-o
Commit: 8cadba29c80766bc8fe12755ac02cca2f741f077
Parents: 1a41624
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:30:28 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:30:28 2014 -0600
----------------------------------------------------------------------
.../collection/event/EntityVersionDeleted.java | 5 +-
.../impl/EntityVersionCleanupTask.java | 85 ++++---
.../serialization/impl/LogEntryIterator.java | 9 +-
.../impl/EntityVersionCleanupTaskTest.java | 230 ++++++++++++-------
4 files changed, 213 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
index 3b76e84..ff7d960 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -1,6 +1,7 @@
package org.apache.usergrid.persistence.collection.event;
+import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -21,8 +22,8 @@ public interface EntityVersionDeleted {
*
* @param scope The scope of the entity
* @param entityId The entity Id that was removed
- * @param entityVersion The version that was removed
+ * @param entityVersions The versions that are to be removed
*/
- public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+ public void versionDeleted(final CollectionScope scope, final Id entityId, final List<UUID> entityVersions);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index d7ece40..2d30d36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,9 +1,9 @@
package org.apache.usergrid.persistence.collection.impl;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,9 +15,14 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
@@ -37,6 +42,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
private final MvccEntitySerializationStrategy entitySerializationStrategy;
+ private final Keyspace keyspace;
private final SerializationFig serializationFig;
@@ -48,12 +54,13 @@ public class EntityVersionCleanupTask implements Task<Void> {
public EntityVersionCleanupTask( final SerializationFig serializationFig,
final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final MvccEntitySerializationStrategy entitySerializationStrategy,
- final List<EntityVersionDeleted> listeners, final CollectionScope scope,
- final Id entityId, final UUID version ) {
+ final Keyspace keyspace, final List<EntityVersionDeleted> listeners,
+ final CollectionScope scope, final Id entityId, final UUID version ) {
this.serializationFig = serializationFig;
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ this.keyspace = keyspace;
this.listeners = listeners;
this.scope = scope;
this.entityId = entityId;
@@ -91,36 +98,67 @@ public class EntityVersionCleanupTask implements Task<Void> {
final UUID maxVersion = version;
- LogEntryIterator logEntryIterator =
- new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
- serializationFig.getHistorySize() );
+ Observable<MvccLogEntry> versions = Observable.create( new ObservableIterator( "versionIterators" ) {
+ @Override
+ protected Iterator getIterator() {
+ return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+ serializationFig.getBufferSize() );
+ }
+ } );
- //for every entry, we want to clean it up with listeners
+ //get the uuid from the version
+ versions.map( new Func1<MvccLogEntry, UUID>() {
+ @Override
+ public UUID call( final MvccLogEntry mvccLogEntry ) {
+ return mvccLogEntry.getVersion();
+ }
+ } )
+ //buffer our versions
+ .buffer( serializationFig.getBufferSize() )
+ //for each buffer set, delete all of them
+ .doOnNext( new Action1<List<UUID>>() {
+ @Override
+ public void call( final List<UUID> versions ) {
- while ( logEntryIterator.hasNext() ) {
+ //Fire all the listeners
+ fireEvents( versions );
- final MvccLogEntry logEntry = logEntryIterator.next();
+ MutationBatch entityBatch = keyspace.prepareMutationBatch();
+ MutationBatch logBatch = keyspace.prepareMutationBatch();
+ for ( UUID version : versions ) {
+ final MutationBatch entityDelete = entitySerializationStrategy.delete( scope, entityId, version );
- final UUID version = logEntry.getVersion();
+ entityBatch.mergeShallow( entityDelete );
+ final MutationBatch logDelete = logEntrySerializationStrategy.delete( scope, entityId, version );
- fireEvents();
+ logBatch.mergeShallow( logDelete );
+ }
- //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
- //after every successful invocation of listeners and entity removal
- entitySerializationStrategy.delete( scope, entityId, version ).execute();
- logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
- }
+ try {
+ entityBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to delete entities in cleanup", e );
+ }
+ try {
+ logBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to delete entities from the log", e );
+ }
+ }
+ } ).count().toBlocking().last();
return null;
}
- private void fireEvents() throws ExecutionException, InterruptedException {
+ private void fireEvents( final List<UUID> versions ) {
final int listenerSize = listeners.size();
@@ -129,7 +167,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
}
if ( listenerSize == 1 ) {
- listeners.get( 0 ).versionDeleted( scope, entityId, version );
+ listeners.get( 0 ).versionDeleted( scope, entityId, versions );
return;
}
@@ -146,7 +184,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
@Override
public void call( final EntityVersionDeleted listener ) {
- listener.versionDeleted( scope, entityId, version );
+ listener.versionDeleted( scope, entityId, versions );
}
} );
}
@@ -154,15 +192,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
LOG.debug( "Finished firing {} listeners", listenerSize );
}
-
-
- private static interface ListenerRunner {
-
- /**
- * Run the listeners
- */
- public void runListeners();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
index 53eb6e3..d87f850 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -11,12 +11,12 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.google.common.base.Preconditions;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/**
* Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- *
*/
public class LogEntryIterator implements Iterator<MvccLogEntry> {
@@ -32,16 +32,19 @@ public class LogEntryIterator implements Iterator<MvccLogEntry> {
/**
- *
* @param logEntrySerializationStrategy The serialization strategy to get the log entries
* @param scope The scope of the entity
* @param entityId The id of the entity
- * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max
+ * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version
+ * < max
* @param pageSize The fetch size to get when querying the serialization strategy
*/
public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
final CollectionScope scope, final Id entityId, final UUID maxVersion,
final int pageSize ) {
+
+ Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.scope = scope;
this.entityId = entityId;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index a34b4f8..1fce6e2 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.junit.AfterClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -43,6 +42,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -69,13 +69,13 @@ public class EntityVersionCleanupTaskTest {
}
- @Test
+ @Test(timeout=10000)
public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -83,6 +83,15 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//intentionally no events
final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -105,20 +114,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch newBatch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( newBatch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( newBatch );
//start the task
@@ -128,22 +135,22 @@ public class EntityVersionCleanupTaskTest {
future.get();
//verify it was run
- verify( firstBatch ).execute();
+ verify( entityBatch ).execute();
- verify( secondBatch ).execute();
+ verify( logBatch ).execute();
}
/**
* Tests the cleanup task on the first version ceated
*/
- @Test
+ @Test(timeout=10000)
public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -151,6 +158,15 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//intentionally no events
final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -173,20 +189,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -196,19 +210,19 @@ public class EntityVersionCleanupTaskTest {
future.get();
//verify it was run
- verify( firstBatch, never() ).execute();
+ verify( entityBatch, never() ).execute();
- verify( secondBatch, never() ).execute();
+ verify( logBatch, never() ).execute();
}
- @Test
+ @Test(timeout=10000)
public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -216,6 +230,15 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 1;
@@ -246,20 +269,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -270,23 +291,23 @@ public class EntityVersionCleanupTaskTest {
//we deleted the version
//verify it was run
- verify( firstBatch ).execute();
+ verify( entityBatch ).execute();
- verify( secondBatch ).execute();
+ verify( logBatch ).execute();
//the latch was executed
latch.await();
}
- @Test
+ @Test(timeout=10000)
public void multipleListenerMultipleVersions()
throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -294,12 +315,22 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 10;
- final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * 3 );
final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
@@ -329,20 +360,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
-
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -353,9 +382,15 @@ public class EntityVersionCleanupTaskTest {
//we deleted the version
//verify we deleted everything
- verify( firstBatch, times( sizeToReturn ) ).execute();
+ verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+ verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
- verify( secondBatch, times( sizeToReturn ) ).execute();
+ verify( logBatch ).execute();
+
+ verify( entityBatch ).execute();
//the latch was executed
latch.await();
@@ -364,18 +399,15 @@ public class EntityVersionCleanupTaskTest {
/**
* Tests what happens when our listeners are VERY slow
- * @throws ExecutionException
- * @throws InterruptedException
- * @throws ConnectionException
*/
- @Test
+ @Test(timeout=10000)
public void multipleListenerMultipleVersionsNoThreadsToRun()
throws ExecutionException, InterruptedException, ConnectionException {
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -383,6 +415,17 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 10;
@@ -390,7 +433,7 @@ public class EntityVersionCleanupTaskTest {
final int listenerCount = 5;
- final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
final Semaphore waitSemaphore = new Semaphore( 0 );
@@ -426,20 +469,18 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask cleanupTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+ mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
-
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
@@ -448,7 +489,7 @@ public class EntityVersionCleanupTaskTest {
/**
* While we're not done, release latches every 200 ms
*/
- while(!future.isDone()) {
+ while ( !future.isDone() ) {
Thread.sleep( 200 );
waitSemaphore.release( listenerCount );
}
@@ -456,37 +497,41 @@ public class EntityVersionCleanupTaskTest {
//wait for the task
future.get();
+
+
//we deleted the version
//verify we deleted everything
- verify( firstBatch, times( sizeToReturn ) ).execute();
+ verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+ verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
+ verify( logBatch ).execute();
+
+ verify( entityBatch ).execute();
+
- verify( secondBatch, times( sizeToReturn ) ).execute();
//the latch was executed
latch.await();
}
-
/**
* Tests that our task will run in the caller if there's no threads, ensures that the task runs
- * @throws ExecutionException
- * @throws InterruptedException
- * @throws ConnectionException
*/
- @Test
- public void runsWhenRejected()
- throws ExecutionException, InterruptedException, ConnectionException {
+ @Test(timeout=10000)
+ public void runsWhenRejected() throws ExecutionException, InterruptedException, ConnectionException {
/**
* only 1 thread on purpose, we want to saturate the task
*/
- final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
+ final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 );
final SerializationFig serializationFig = mock( SerializationFig.class );
- when( serializationFig.getHistorySize() ).thenReturn( 10 );
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
mock( MvccEntitySerializationStrategy.class );
@@ -494,6 +539,16 @@ public class EntityVersionCleanupTaskTest {
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
mock( MvccLogEntrySerializationStrategy.class );
+ final Keyspace keyspace1 = mock( Keyspace.class );
+ final Keyspace keyspace2 = mock( Keyspace.class );
+
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ when( keyspace1.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+ when( keyspace2.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
//create a latch for the event listener, and add it to the list of events
final int sizeToReturn = 10;
@@ -501,7 +556,7 @@ public class EntityVersionCleanupTaskTest {
final int listenerCount = 2;
- final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+ final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
final Semaphore waitSemaphore = new Semaphore( 0 );
@@ -509,7 +564,6 @@ public class EntityVersionCleanupTaskTest {
final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
-
final Id applicationId = new SimpleId( "application" );
@@ -528,37 +582,37 @@ public class EntityVersionCleanupTaskTest {
EntityVersionCleanupTask firstTask =
new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener), appScope, entityId, version );
-
+ mvccEntitySerializationStrategy, keyspace1, Arrays.<EntityVersionDeleted>asList( slowListener ),
+ appScope, entityId, version );
//change the listeners to one that is just invoked quickly
EntityVersionCleanupTask secondTask =
- new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
- mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener), appScope, entityId, version );
+ new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+ mvccEntitySerializationStrategy, keyspace2, Arrays.<EntityVersionDeleted>asList( runListener ),
+ appScope, entityId, version );
- final MutationBatch firstBatch = mock( MutationBatch.class );
+ final MutationBatch batch = mock( MutationBatch.class );
//set up returning a mutator
when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( firstBatch );
+ .thenReturn( batch );
- final MutationBatch secondBatch = mock( MutationBatch.class );
when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
- .thenReturn( secondBatch );
+ .thenReturn( batch );
//start the task
- ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
+ ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
//now start another task while the slow running task is running
- ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
+ ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
//get the second task, we shouldn't have been able to queue it, therefore it should just run in process
future2.get();
@@ -566,7 +620,7 @@ public class EntityVersionCleanupTaskTest {
/**
* While we're not done, release latches every 200 ms
*/
- while(!future1.isDone()) {
+ while ( !future1.isDone() ) {
Thread.sleep( 200 );
waitSemaphore.release( listenerCount );
}
@@ -576,9 +630,19 @@ public class EntityVersionCleanupTaskTest {
//we deleted the version
//verify we deleted everything
- verify( firstBatch, times( sizeToReturn*2 ) ).execute();
- verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+ //we deleted the version
+ //verify we deleted everything
+ verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
+
+ verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
+
+
+ verify( logBatch, times(2) ).execute();
+
+ verify( entityBatch, times(2) ).execute();
+
//the latch was executed
latch.await();
@@ -595,7 +659,7 @@ public class EntityVersionCleanupTaskTest {
@Override
- public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
invocationLatch.countDown();
}
}
@@ -612,7 +676,7 @@ public class EntityVersionCleanupTaskTest {
@Override
- public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+ public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
//wait for unblock to happen before counting down invocation latches
try {
blockLatch.acquire();