You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/01 18:32:05 UTC

[01/15] git commit: Added task execution framework

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o abbd76eb4 -> 2678eae9e


Added task execution framework


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/66e508a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/66e508a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/66e508a9

Branch: refs/heads/two-dot-o
Commit: 66e508a942d33051c95bfcd80e19b87a40812370
Parents: c667c18
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 16:47:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 16:47:24 2014 -0600

----------------------------------------------------------------------
 .../core/task/NamedTaskExecutorImpl.java        | 162 +++++++++++++
 .../usergrid/persistence/core/task/Task.java    |  37 +++
 .../persistence/core/task/TaskExecutor.java     |  14 ++
 .../core/task/NamedTaskExecutorImplTest.java    | 233 +++++++++++++++++++
 4 files changed, 446 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
new file mode 100644
index 0000000..8ba73a0
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -0,0 +1,162 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+/**
+ * Implementation of the task executor with a unique name and size
+ */
+public class NamedTaskExecutorImpl implements TaskExecutor {
+
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
+
+    private final ListeningExecutorService executorService;
+
+
+    /**
+     * @param name The name of this instance of the task executor
+     * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
+     * @param queueLength The length of tasks to keep in the queue
+     */
+    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+        Preconditions.checkNotNull( name );
+        Preconditions.checkArgument( name.length() > 0, "name must have a length" );
+        Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
+
+
+        final BlockingQueue<Runnable> queue =
+                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+    }
+
+
+    @Override
+    public <V, I> void submit( final Task<V, I> task ) {
+
+        final ListenableFuture<V> future;
+
+        try {
+            future = executorService.submit( task );
+
+            /**
+             * Log our success or failures for debugging purposes
+             */
+            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+            task.rejected();
+            return;
+        }
+    }
+
+
+    /**
+     * Callback for when the task succeeds or fails.
+     */
+    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+        private final Task<V, I> task;
+
+
+        private TaskFutureCallBack( Task<V, I> task ) {
+            this.task = task;
+        }
+
+
+        @Override
+        public void onSuccess( @Nullable final V result ) {
+            LOG.debug( "Successfully completed task ", task );
+        }
+
+
+        @Override
+        public void onFailure( final Throwable t ) {
+            LOG.error( "Unable to execute task.  Exception is ", t );
+
+            task.exceptionThrown( t );
+        }
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+
+            super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+                    new RejectedHandler() );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private static final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
+        private final String name;
+
+
+        private CountingThreadFactory( final String name ) {this.name = name;}
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, name + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private static final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+
+            //            ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
+            //
+            //            future.
+            //            final Task<?, ?> task = ( Task<?, ?> ) r;
+            LOG.warn( "Audit queue full, rejecting audit task {}", r );
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+            //            LOG.warn( "Audit queue full, rejecting audit task {}", task );
+            //            task.rejected();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
new file mode 100644
index 0000000..8b1ed22
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -0,0 +1,37 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.concurrent.Callable;
+
+
+
+/**
+ * The task to execute
+ */
+public interface Task<V, I> extends Callable<V> {
+
+    /**
+     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
+     *
+     * @return
+     */
+    I getId();
+
+    /**
+     * Invoked when this task throws an uncaught exception.
+     * @param throwable
+     */
+    void exceptionThrown(final Throwable throwable);
+
+    /**
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
+     * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+     *
+     */
+    void rejected();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
new file mode 100644
index 0000000..e60da83
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * An interface for execution of tasks
+ */
+public interface TaskExecutor {
+
+    /**
+     * Submit the task asynchronously
+     * @param task
+     */
+    public <V, I> void submit(Task<V, I> task);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/66e508a9/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
new file mode 100644
index 0000000..9da9263
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -0,0 +1,233 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+
+/**
+ * Tests for the namedtask execution impl
+ */
+public class NamedTaskExecutorImplTest {
+
+
+    @Test
+    public void jobSuccess() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        assertEquals( 0l, exceptionLatch.getCount() );
+
+        assertEquals( 0l, rejectedLatch.getCount() );
+    }
+
+
+    @Test
+    public void exceptionThrown() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        final RuntimeException re = new RuntimeException( "throwing exception" );
+
+        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+            @Override
+            public Void call() throws Exception {
+                super.call();
+                throw re;
+            }
+        };
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+        exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        assertSame( re, task.exceptions.get( 0 ) );
+
+
+        assertEquals( 0l, rejectedLatch.getCount() );
+    }
+
+
+    @Test
+    public void noCapacity() throws InterruptedException {
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+            @Override
+            public Void call() throws Exception {
+                super.call();
+
+                //park this thread so it takes up a task and the next is rejected
+                final Object mutex = new Object();
+
+                synchronized ( mutex ) {
+                    mutex.wait();
+                }
+
+                return null;
+            }
+        };
+
+        executor.submit( task );
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 1l, secondRunLatch.getCount() );
+        assertEquals( 0l, secondExceptionLatch.getCount() );
+    }
+
+
+    @Test
+    public void noCapacityWithQueue() throws InterruptedException {
+
+        final int threadPoolSize = 1;
+        final int queueSize = 10;
+
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+        final CountDownLatch runLatch = new CountDownLatch( 1 );
+
+        int iterations = threadPoolSize + queueSize;
+
+        for(int i = 0; i < iterations; i ++) {
+
+            final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+                @Override
+                public Void call() throws Exception {
+                    super.call();
+
+                    //park this thread so it takes up a task and the next is rejected
+                    final Object mutex = new Object();
+
+                    synchronized ( mutex ) {
+                        mutex.wait();
+                    }
+
+                    return null;
+                }
+            };
+            executor.submit( task );
+        }
+
+
+
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+        final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
+        final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
+
+
+        final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+
+        executor.submit( testTask );
+
+
+        secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //if we get here we've been rejected, just double check we didn't run
+
+        assertEquals( 1l, secondRunLatch.getCount() );
+        assertEquals( 0l, secondExceptionLatch.getCount() );
+    }
+
+
+    private static abstract class TestTask<V> implements Task<V, UUID> {
+
+        private final List<Throwable> exceptions;
+        private final CountDownLatch exceptionLatch;
+        private final CountDownLatch rejectedLatch;
+        private final CountDownLatch runLatch;
+
+
+        private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+                          final CountDownLatch runLatch ) {
+            this.exceptionLatch = exceptionLatch;
+            this.rejectedLatch = rejectedLatch;
+            this.runLatch = runLatch;
+
+            this.exceptions = new ArrayList<>();
+        }
+
+
+        @Override
+        public UUID getId() {
+            return UUIDGenerator.newTimeUUID();
+        }
+
+
+        @Override
+        public void exceptionThrown( final Throwable throwable ) {
+            this.exceptions.add( throwable );
+            exceptionLatch.countDown();
+        }
+
+
+        @Override
+        public void rejected() {
+            rejectedLatch.countDown();
+        }
+
+
+        @Override
+        public V call() throws Exception {
+            runLatch.countDown();
+            return null;
+        }
+    }
+}


[07/15] git commit: Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."

Posted by to...@apache.org.
Revert "Converted tasks to ForkJoin tasks to allow for parallel execution."

This reverts commit dc3f448c946219c44e3dbfadb294fea0b8048343.

The ForkJoinPool does not have the following characteristics

Ability to name a pool
Ability to set queue size for rejection

As a result, it is not well suited for our asynchronous I/O tasks, and I'm moving back to ExecutorService

Conflicts:
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
	stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
	stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af3726b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af3726b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af3726b8

Branch: refs/heads/two-dot-o
Commit: af3726b894051fde9ce63c8db1db0996aa4f2992
Parents: d4f80e7
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 17:58:24 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:58:24 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   2 +-
 .../persistence/core/task/ImmediateTask.java    |  36 ------
 .../core/task/NamedTaskExecutorImpl.java        | 126 +++++++++++++------
 .../usergrid/persistence/core/task/Task.java    |  53 +++-----
 .../persistence/core/task/TaskExecutor.java     |   3 +-
 .../core/task/NamedTaskExecutorImplTest.java    | 116 ++++-------------
 .../persistence/graph/guice/GraphModule.java    |   2 +-
 .../impl/shard/ShardGroupCompaction.java        |   7 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  51 ++++++--
 9 files changed, 180 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index a6230e1..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
deleted file mode 100644
index f6c28a3..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.usergrid.persistence.core.task;
-
-
-/**
- * Does not perform computation, just returns the value passed to it
- *
- */
-public class ImmediateTask<V> extends Task<V> {
-
-
-    private final V returned;
-
-
-    protected ImmediateTask( final V returned ) {
-        this.returned = returned;
-    }
-
-
-
-    @Override
-    public V executeTask() throws Exception {
-        return returned;
-    }
-
-
-    @Override
-    public void exceptionThrown( final Throwable throwable ) {
-          //no op
-    }
-
-
-    @Override
-    public void rejected() {
-        //no op
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index bb9cac8..d4cc915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.usergrid.persistence.core.task;
 
 
@@ -24,6 +6,7 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +18,10 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 
 /**
@@ -44,64 +31,131 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
 
-    private final NamedForkJoinPool executorService;
+    private final ListeningExecutorService executorService;
 
     private final String name;
     private final int poolSize;
+    private final int queueLength;
 
 
     /**
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
+     * @param queueLength The length of tasks to keep in the queue
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize ) {
-
-        //TODO, figure out how to name the fork/join threads in the pool
+    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
+        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
         this.name = name;
         this.poolSize = poolSize;
+        this.queueLength = queueLength;
+
+        final BlockingQueue<Runnable> queue =
+                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
 
-        this.executorService = new NamedForkJoinPool( poolSize );
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
     }
 
 
     @Override
-    public <V> Task<V> submit( final Task<V> task ) {
+    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+
+        final ListenableFuture<V> future;
 
         try {
-            executorService.submit( task );
+            future = executorService.submit( task );
+
+            /**
+             * Log our success or failures for debugging purposes
+             */
+            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
+            return Futures.immediateCancelledFuture();
         }
 
-        return task;
+        return future;
     }
 
 
-    @Override
-    public void shutdown() {
-        executorService.shutdownNow();
+    /**
+     * Callback for when the task succeeds or fails.
+     */
+    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+
+        private final Task<V, I> task;
+
+
+        private TaskFutureCallBack( Task<V, I> task ) {
+            this.task = task;
+        }
+
+
+        @Override
+        public void onSuccess( @Nullable final V result ) {
+            LOG.debug( "Successfully completed task ", task );
+        }
+
+
+        @Override
+        public void onFailure( final Throwable t ) {
+            LOG.error( "Unable to execute task.  Exception is ", t );
+
+            task.exceptionThrown( t );
+        }
     }
 
 
-    private final class NamedForkJoinPool extends ForkJoinPool {
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
 
-        private NamedForkJoinPool( final int workerThreadCount ) {
-            //TODO, verify the scheduler at the end
-            super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
+                    new RejectedHandler() );
         }
     }
 
 
-    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
 
         @Override
-        public void uncaughtException( final Thread t, final Throwable e ) {
-            LOG.error( "Uncaught exception on thread {} was {}", t, e );
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, name + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
         }
     }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            LOG.warn( "{} task queue full, rejecting task {}", name, r );
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 5d35ce4..9a0b857 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,35 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public abstract class Task<V> extends RecursiveTask<V> {
+public interface Task<V, I> extends Callable<V> {
+
+    /**
+     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
+     */
+    public abstract I getId();
+
 
     @Override
     protected V compute() {
@@ -43,22 +27,21 @@ public abstract class Task<V> extends RecursiveTask<V> {
     }
 
 
-
-    /**
-     * Execute the task
-     */
-    public abstract V executeTask() throws Exception;
-
     /**
      * Invoked when this task throws an uncaught exception.
+     * @param throwable
      */
-    public abstract void exceptionThrown( final Throwable throwable );
+    void exceptionThrown(final Throwable throwable);
 
     /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
-     * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
-     * scheduling thread.  Note that this has performance implications to the user.  If you can drop the request and
-     * process later (lazy repair for instance ) do so.
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
+     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
+     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
+     * request and process later (lazy repair for instance ) do so.
+     *
      */
-    public abstract void rejected();
+    void rejected();
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index c6f5915..aaa9d60 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,10 +13,11 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V> Task<V > submit( Task<V> task );
+    public <V> ListenableFuture<V> submit( Task<V> task );
 
     /**
      * Stop the task executor without waiting for scheduled threads to run
      */
     public void shutdown();
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9656b5f..fa63eee 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -23,13 +23,13 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
 
         executor.submit( task );
 
@@ -44,7 +44,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -53,10 +53,9 @@ public class NamedTaskExecutorImplTest {
         final RuntimeException re = new RuntimeException( "throwing exception" );
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
-
-
             @Override
-            public Void executeTask() {
+            public Void call() throws Exception {
+                super.call();
                 throw re;
             }
         };
@@ -76,7 +75,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -85,8 +84,8 @@ public class NamedTaskExecutorImplTest {
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
             @Override
-            public Void executeTask() throws Exception {
-                super.executeTask();
+            public Void call() throws Exception {
+                super.call();
 
                 //park this thread so it takes up a task and the next is rejected
                 final Object mutex = new Object();
@@ -130,22 +129,22 @@ public class NamedTaskExecutorImplTest {
     public void noCapacityWithQueue() throws InterruptedException {
 
         final int threadPoolSize = 1;
-       
+        final int queueSize = 10;
 
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        int iterations = threadPoolSize ;
+        int iterations = threadPoolSize + queueSize;
 
-        for ( int i = 0; i < iterations; i++ ) {
+        for(int i = 0; i < iterations; i ++) {
 
             final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
                 @Override
-                public Void executeTask() throws Exception {
-                    super.executeTask();
+                public Void call() throws Exception {
+                    super.call();
 
                     //park this thread so it takes up a task and the next is rejected
                     final Object mutex = new Object();
@@ -161,6 +160,7 @@ public class NamedTaskExecutorImplTest {
         }
 
 
+
         runLatch.await( 1000, TimeUnit.MILLISECONDS );
 
         //now submit the second task
@@ -185,81 +185,12 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    @Test
-    public void jobTreeResult() throws InterruptedException {
-
-        final int threadPoolSize = 4;
-       
-
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
-
-        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
-        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
-
-        //accomodates for splitting the job 1->2->4 and joining
-        final CountDownLatch runLatch = new CountDownLatch( 7 );
-
-
-        TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+    private static abstract class TestTask<V> implements Task<V, UUID> {
 
-         executor.submit( task );
-
-
-        //compute our result
-        Integer result = task.join();
-
-        //result should be 1+2*2+3*4
-        final int expected = 4*3;
-
-        assertEquals(expected, result.intValue());
-
-        //just to check our latches
-        runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
-        //now submit the second task
-
-
-    }
-
-
-    private static class TestRecursiveTask extends TestTask<Integer> {
-
-        private final int depth;
-        private final int maxDepth;
-
-        private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
-                                   final CountDownLatch runLatch, final int depth, final int maxDepth ) {
-            super( exceptionLatch, rejectedLatch, runLatch );
-            this.depth = depth;
-            this.maxDepth = maxDepth;
-        }
-
-
-        @Override
-        public Integer executeTask() throws Exception {
-
-            if(depth == maxDepth ){
-                return depth;
-            }
-
-            TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
-
-            TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
-
-            //run our left in another thread
-            left.fork();
-
-            return right.compute() + left.join();
-        }
-    }
-
-
-    private static abstract class TestTask<V> extends Task<V> {
-
-        protected final List<Throwable> exceptions;
-        protected final CountDownLatch exceptionLatch;
-        protected final CountDownLatch rejectedLatch;
-        protected final CountDownLatch runLatch;
+        private final List<Throwable> exceptions;
+        private final CountDownLatch exceptionLatch;
+        private final CountDownLatch rejectedLatch;
+        private final CountDownLatch runLatch;
 
 
         private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -272,6 +203,11 @@ public class NamedTaskExecutorImplTest {
         }
 
 
+        @Override
+        public UUID getId() {
+            return UUIDGenerator.newTimeUUID();
+        }
+
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -287,7 +223,7 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public V executeTask() throws Exception {
+        public V call() throws Exception {
             runLatch.countDown();
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index a3978fc..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
-        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount()  );
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index ed63587..4fe1a63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,8 +27,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -44,8 +42,9 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public Task<AuditResult> evaluateShardGroup(
-            final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group );
 
 
     public enum AuditResult {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af3726b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index a84a0f0..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,13 +37,14 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.ImmediateTask;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -65,6 +66,9 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -273,29 +277,45 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
     @Override
-    public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                                final DirectedEdgeMeta edgeMeta,
-                                                                final ShardEntryGroup group ) {
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group ) {
 
         final double repairChance = random.nextDouble();
 
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return new ImmediateTask<AuditResult>(  AuditResult.NOT_CHECKED ) {};
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
         }
 
         /**
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
          * hose the system
          */
-       return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
+            @Override
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
 
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
+
+        return future;
     }
 
 
-    private final class ShardAuditTask extends Task<AuditResult> {
+    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -310,6 +330,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         }
 
 
+        @Override
+        public ShardAuditKey getId() {
+            return new ShardAuditKey( scope, edgeMeta, group );
+        }
+
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -320,12 +345,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         @Override
         public void rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
+            LOG.error( "Rejected audit for shard of {}", getId() );
         }
 
 
         @Override
-        public AuditResult executeTask() throws Exception {
+        public AuditResult call() throws Exception {
             /**
              * We don't have a compaction pending.  Run an audit on the shards
              */
@@ -376,8 +401,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group );
-                    LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
-                                    + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
+                    LOG.info(
+                            "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+                                    "{} is {}",
+                            new Object[] { scope, edgeMeta, group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -391,7 +418,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    public static final class ShardAuditKey {
+    private static final class ShardAuditKey {
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
         private final ShardEntryGroup group;


[06/15] git commit: Finished testing of the EntityVersionCleanupTask

Posted by to...@apache.org.
Finished testing of the EntityVersionCleanupTask


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d4f80e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d4f80e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d4f80e7c

Branch: refs/heads/two-dot-o
Commit: d4f80e7c85093e17081a4210491022b9001a02b7
Parents: 018dbee
Author: Todd Nine <to...@apache.org>
Authored: Sat Sep 27 15:25:09 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Sat Sep 27 17:07:50 2014 -0600

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTask.java          | 121 ++--
 .../impl/EntityVersionCleanupTaskTest.java      | 583 ++++++++++++++++++-
 .../persistence/core/task/ImmediateTask.java    |  12 +-
 .../core/task/NamedTaskExecutorImpl.java        |  10 +-
 .../usergrid/persistence/core/task/Task.java    |  27 +-
 .../persistence/core/task/TaskExecutor.java     |   7 +-
 .../core/task/NamedTaskExecutorImplTest.java    |   9 +-
 .../impl/shard/ShardGroupCompaction.java        |   2 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  13 +-
 9 files changed, 684 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 29ca3ac..3d1483d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,10 +1,10 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Stack;
 import java.util.UUID;
-import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RecursiveTask;
 
 import org.slf4j.Logger;
@@ -15,10 +15,8 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.entity.EntityVersion;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -27,12 +25,11 @@ import org.apache.usergrid.persistence.model.entity.Id;
  * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
  * retained, the range is exclusive.
  */
-public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+public class EntityVersionCleanupTask extends Task<Void> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
 
 
-    private final CollectionIoEvent<EntityVersion> collectionIoEvent;
     private final List<EntityVersionDeleted> listeners;
 
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
@@ -40,29 +37,31 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
 
     private final SerializationFig serializationFig;
 
+    private final CollectionScope scope;
+    private final Id entityId;
+    private final UUID version;
 
-    private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                                      final MvccEntitySerializationStrategy entitySerializationStrategy,
-                                      final CollectionIoEvent<EntityVersion> collectionIoEvent,
-                                      final List<EntityVersionDeleted> listeners,
-                                      final SerializationFig serializationFig ) {
-        this.collectionIoEvent = collectionIoEvent;
-        this.listeners = listeners;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-        this.serializationFig = serializationFig;
-    }
 
+    public EntityVersionCleanupTask( final SerializationFig serializationFig,
+                                     final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                     final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                     final List<EntityVersionDeleted> listeners, final CollectionScope scope,
+                                     final Id entityId, final UUID version ) {
 
-    @Override
-    public CollectionIoEvent<EntityVersion> getId() {
-        return collectionIoEvent;
+        this.serializationFig = serializationFig;
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+        this.listeners = listeners;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.version = version;
     }
 
 
     @Override
     public void exceptionThrown( final Throwable throwable ) {
-        LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+                new Object[] { scope, entityId, version }, throwable );
     }
 
 
@@ -81,15 +80,15 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
 
 
     @Override
-    public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
+    public Void executeTask() throws Exception {
 
-        final CollectionScope scope = collectionIoEvent.getEntityCollection();
-        final Id entityId = collectionIoEvent.getEvent().getId();
-        final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+        final UUID maxVersion = version;
 
 
         LogEntryIterator logEntryIterator =
-                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() );
+                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+                        serializationFig.getHistorySize() );
 
 
         //for every entry, we want to clean it up with listeners
@@ -100,39 +99,73 @@ public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersi
 
 
             final UUID version = logEntry.getVersion();
-            List<ForkJoinTask<Void>> tasks = new ArrayList<>();
 
 
-            //execute all the listeners
-            for (final  EntityVersionDeleted listener : listeners ) {
+            fireEvents();
 
-                tasks.add( new RecursiveTask<Void>() {
-                    @Override
-                    protected Void compute() {
-                        listener.versionDeleted( scope, entityId, version );
-                        return null;
-                    }
-                }.fork() );
+            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
+            //after every successful invocation of listeners and entity removal
+            entitySerializationStrategy.delete( scope, entityId, version ).execute();
 
+            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+        }
 
-            }
 
-            //wait for them to complete
+        return null;
+    }
 
-            joinAll(tasks);
 
-            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
-            //after every successful invocation of listeners and entity removal
-            entitySerializationStrategy.delete( scope, entityId, version ).execute();
+    private void fireEvents() throws ExecutionException, InterruptedException {
 
-            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
+        if ( listeners.size() == 0 ) {
+            return;
         }
 
+        //stack to track forked tasks
+        final Stack<RecursiveTask<Void>> tasks = new Stack<>();
+
+
+        //we don't want to fork the final listener, we'll run that in our current thread
+        final int forkedTaskSize = listeners.size() - 1;
+
+
+        //execute all the listeners
+        for ( int i = 0; i < forkedTaskSize; i++ ) {
+
+            final EntityVersionDeleted listener = listeners.get( i );
+
+            final RecursiveTask<Void> task = createTask( listener );
+
+            task.fork();
 
-        return collectionIoEvent;
+            tasks.push( task );
+        }
+
+
+        final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize ) );
+
+        lastTask.invoke();
+
+
+        //wait for them to complete
+        while ( !tasks.isEmpty() ) {
+            tasks.pop().get();
+        }
     }
 
 
+    /**
+     * Return the new task to execute
+     */
+    private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
+        return new RecursiveTask<Void>() {
+            @Override
+            protected Void compute() {
+                listener.versionDeleted( scope, entityId, version );
+                return null;
+            }
+        };
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 050ea9e..2df75f1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -22,10 +22,34 @@ package org.apache.usergrid.persistence.collection.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
 
+import org.junit.AfterClass;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -33,8 +57,563 @@ import static org.junit.Assert.*;
  */
 public class EntityVersionCleanupTaskTest {
 
+    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4 );
+
+
+    @AfterClass
+    public static void shutdown() {
+        taskExecutor.shutdown();
+    }
+
+
+    @Test
+    public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //intentionally no events
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock =
+                LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 2 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //verify it was run
+        verify( firstBatch ).execute();
+
+        verify( secondBatch ).execute();
+    }
+
+
+    /**
+     * Tests the cleanup task on the first version ceated
+     */
+    @Test
+    public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //intentionally no events
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock =
+                LogEntryMock.createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //verify it was run
+        verify( firstBatch, never() ).execute();
+
+        verify( secondBatch, never() ).execute();
+    }
+
+
     @Test
-    public void multiPageTask(){
+    public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn );
+
+        final EntityVersionDeletedTest eventListener = new EntityVersionDeletedTest( latch );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        listeners.add( eventListener );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //we deleted the version
+        //verify it was run
+        verify( firstBatch ).execute();
+
+        verify( secondBatch ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    @Test
+    public void multipleListenerMultipleVersions()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+
+        final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
+        final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
+        final EntityVersionDeletedTest listener3 = new EntityVersionDeletedTest( latch );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
+
+        listeners.add( listener1 );
+        listeners.add( listener2 );
+        listeners.add( listener3 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        //wait for the task
+        cleanupTask.get();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( firstBatch, times( sizeToReturn ) ).execute();
+
+        verify( secondBatch, times( sizeToReturn ) ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    /**
+     * Tests what happens when our listeners are VERY slow
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws ConnectionException
+     */
+    @Test
+    public void multipleListenerMultipleVersionsNoThreadsToRun()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final int listenerCount = 5;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener2 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener3 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener4 = new SlowListener( latch, waitSemaphore );
+        final SlowListener listener5 = new SlowListener( latch, waitSemaphore );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+        listeners.add( listener1 );
+        listeners.add( listener2 );
+        listeners.add( listener3 );
+        listeners.add( listener4 );
+        listeners.add( listener5 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask cleanupTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( cleanupTask );
+
+        /**
+         * While we're not done, release latches every 200 ms
+         */
+        while(!cleanupTask.isDone()) {
+            Thread.sleep( 200 );
+            waitSemaphore.release( listenerCount );
+        }
+
+        //wait for the task
+        cleanupTask.get();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( firstBatch, times( sizeToReturn ) ).execute();
+
+        verify( secondBatch, times( sizeToReturn ) ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+
+    /**
+     * Tests that our task will run in the caller if there's no threads, ensures that the task runs
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws ConnectionException
+     */
+    @Test
+    public void runsWhenRejected()
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        /**
+         * only 1 thread on purpose, we want to saturate the task
+         */
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+
+        //create a latch for the event listener, and add it to the list of events
+        final int sizeToReturn = 10;
+
+
+        final int listenerCount = 1;
+
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final Semaphore waitSemaphore = new Semaphore( 0 );
+
+
+        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+
+        final List<EntityVersionDeleted> listeners = new ArrayList<>();
+
+        listeners.add( listener1 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+
+        final CollectionScope appScope = new CollectionScopeImpl( applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+
+        //mock up a single log entry for our first test
+        final LogEntryMock logEntryMock = LogEntryMock
+                .createLogEntryMock( mvccLogEntrySerializationStrategy, appScope, entityId, sizeToReturn + 1 );
+
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+
+        EntityVersionCleanupTask firstTask =
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+        EntityVersionCleanupTask secondTask =
+                      new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                              mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+
+
+        final MutationBatch firstBatch = mock( MutationBatch.class );
+
+
+        //set up returning a mutator
+        when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( firstBatch );
+
+
+        final MutationBatch secondBatch = mock( MutationBatch.class );
+
+        when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+                .thenReturn( secondBatch );
+
+
+        //start the task
+        taskExecutor.submit( firstTask );
+
+        //now start another task while the slow running task is running
+        taskExecutor.submit( secondTask );
+
+        //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
+        secondTask.get();
+
+        /**
+         * While we're not done, release latches every 200 ms
+         */
+        while(!firstTask.isDone()) {
+            Thread.sleep( 200 );
+            waitSemaphore.release( listenerCount );
+        }
+
+        //wait for the task
+        firstTask.get();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( firstBatch, times( sizeToReturn*2 ) ).execute();
+
+        verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+        //the latch was executed
+        latch.await();
+    }
+
+
+    private static class EntityVersionDeletedTest implements EntityVersionDeleted {
+        final CountDownLatch invocationLatch;
+
+
+        private EntityVersionDeletedTest( final CountDownLatch invocationLatch ) {
+            this.invocationLatch = invocationLatch;
+        }
+
+
+        @Override
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+            invocationLatch.countDown();
+        }
+    }
+
+
+    private static class SlowListener extends EntityVersionDeletedTest {
+        final Semaphore blockLatch;
+
+
+        private SlowListener( final CountDownLatch invocationLatch, final Semaphore blockLatch ) {
+            super( invocationLatch );
+            this.blockLatch = blockLatch;
+        }
+
 
+        @Override
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+            //wait for unblock to happen before counting down invocation latches
+            try {
+                blockLatch.acquire();
+            }
+            catch ( InterruptedException e ) {
+                throw new RuntimeException( e );
+            }
+            super.versionDeleted( scope, entityId, entityVersion );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
index 627e7e8..f6c28a3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -5,23 +5,17 @@ package org.apache.usergrid.persistence.core.task;
  * Does not perform computation, just returns the value passed to it
  *
  */
-public class ImmediateTask<V, I> extends Task<V, I> {
+public class ImmediateTask<V> extends Task<V> {
+
 
-    private final I id;
     private final V returned;
 
 
-    protected ImmediateTask( final I id, final V returned ) {
-        this.id = id;
+    protected ImmediateTask( final V returned ) {
         this.returned = returned;
     }
 
 
-    @Override
-    public I getId() {
-        return id;
-    }
-
 
     @Override
     public V executeTask() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 4fc72c8..bb9cac8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -55,6 +55,8 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
      */
     public NamedTaskExecutorImpl( final String name, final int poolSize ) {
+
+        //TODO, figure out how to name the fork/join threads in the pool
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -67,7 +69,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> Task<V, I> submit( final Task<V, I> task ) {
+    public <V> Task<V> submit( final Task<V> task ) {
 
         try {
             executorService.submit( task );
@@ -80,6 +82,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     }
 
 
+    @Override
+    public void shutdown() {
+        executorService.shutdownNow();
+    }
+
+
     private final class NamedForkJoinPool extends ForkJoinPool {
 
         private NamedForkJoinPool( final int workerThreadCount ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 4dccc72..5d35ce4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -29,13 +29,7 @@ import java.util.concurrent.RecursiveTask;
 /**
  * The task to execute
  */
-public abstract class Task<V, I> extends RecursiveTask<V> {
-
-    /**
-     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
-     */
-    public abstract I getId();
-
+public abstract class Task<V> extends RecursiveTask<V> {
 
     @Override
     protected V compute() {
@@ -49,25 +43,6 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
     }
 
 
-    /**
-     * Fork all tasks in the list except the last one.  The last will be run in the caller thread
-     * The others will wait for join
-     * @param tasks
-     *
-     */
-    public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T> tasks) throws ExecutionException, InterruptedException {
-
-        //don't fork the last one
-       List<V> results = new ArrayList<>(tasks.size());
-
-        for(T task: tasks){
-            results.add( task.join() );
-        }
-
-        return results;
-
-    }
-
 
     /**
      * Execute the task

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index a3553c7..c6f5915 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,10 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> Task<V, I > submit( Task<V, I> task );
+    public <V> Task<V > submit( Task<V> task );
+
+    /**
+     * Stop the task executor without waiting for scheduled threads to run
+     */
+    public void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index f65d5a6..9656b5f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
 
         executor.submit( task );
 
@@ -254,7 +254,7 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    private static abstract class TestTask<V> extends Task<V, UUID> {
+    private static abstract class TestTask<V> extends Task<V> {
 
         protected final List<Throwable> exceptions;
         protected final CountDownLatch exceptionLatch;
@@ -272,11 +272,6 @@ public class NamedTaskExecutorImplTest {
         }
 
 
-        @Override
-        public UUID getId() {
-            return UUIDGenerator.newTimeUUID();
-        }
-
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 15acaa8..ed63587 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -44,7 +44,7 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+    public Task<AuditResult> evaluateShardGroup(
             final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d4f80e7c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 751b4d9..a84a0f0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -273,7 +273,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
     @Override
-    public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+    public Task<AuditResult> evaluateShardGroup( final ApplicationScope scope,
                                                                 final DirectedEdgeMeta edgeMeta,
                                                                 final ShardEntryGroup group ) {
 
@@ -282,7 +282,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return new ImmediateTask<AuditResult, ShardAuditKey>(  new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
+            return new ImmediateTask<AuditResult>(  AuditResult.NOT_CHECKED ) {};
         }
 
         /**
@@ -295,7 +295,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
+    private final class ShardAuditTask extends Task<AuditResult> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -310,11 +310,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         }
 
 
-        @Override
-        public ShardAuditKey getId() {
-            return new ShardAuditKey( scope, edgeMeta, group );
-        }
-
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -325,7 +320,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         @Override
         public void rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of {}", getId() );
+            LOG.error( "Rejected audit for shard of with scope {}, edgeMeta of {} and group of {}", scope, edgeMeta, group );
         }
 
 


[02/15] git commit: Migrated graph over to new task executor

Posted by to...@apache.org.
Migrated graph over to new task executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4c72f5c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4c72f5c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4c72f5c6

Branch: refs/heads/two-dot-o
Commit: 4c72f5c636bae9b16df7578302538c7b4d9600e7
Parents: 66e508a
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 17:39:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 17:39:07 2014 -0600

----------------------------------------------------------------------
 .../collection/event/EntityVersionRemoved.java  |  26 +++
 .../collection/guice/CollectionModule.java      |  17 +-
 .../serialization/SerializationFig.java         |  32 ++-
 .../core/astyanax/AstyanaxKeyspaceProvider.java |   2 +
 .../persistence/core/guice/CommonModule.java    |   2 +
 .../core/task/NamedTaskExecutorImpl.java        |  14 +-
 .../persistence/core/task/TaskExecutor.java     |   2 +-
 .../usergrid/persistence/graph/GraphFig.java    |   2 +
 .../persistence/graph/guice/GraphModule.java    |  15 ++
 .../shard/impl/ShardGroupCompactionImpl.java    | 202 ++++++++++++-------
 .../impl/shard/ShardGroupCompactionTest.java    |   5 +-
 11 files changed, 227 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
new file mode 100644
index 0000000..dca575d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+
+
+/**
+ *
+ * Invoked when an entity version is removed.  Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionRemoved {
+
+
+    /**
+     * The version specified was removed.
+     * @param scope
+     * @param entityId
+     * @param entityVersion
+     */
+    public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 84f69db..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
@@ -79,8 +81,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Inject
     @Write
-
-    public WriteStart write (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+    public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
         final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
 
         return writeStart;
@@ -90,13 +91,21 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Inject
     @WriteUpdate
-
-    public WriteStart writeUpdate (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+    public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
         final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
 
         return writeStart;
     }
 
+    @Inject
+    @Singleton
+    @Provides
+    public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+    }
+
+
+
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 81302a6..7e69a19 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -15,25 +15,45 @@ public interface SerializationFig extends GuicyFig {
 
     /**
      * Time to live timeout in seconds.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.stage.transient.timeout" )
-    @Default( "5" )
+    @Key("collection.stage.transient.timeout")
+    @Default("5")
     int getTimeout();
 
     /**
      * Number of history items to return for delete.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.delete.history.size" )
-    @Default( "100" )
+    @Key("collection.delete.history.size")
+    @Default("100")
     int getHistorySize();
 
     /**
      * Number of items to buffer.
+     *
      * @return Timeout in seconds.
      */
-    @Key( "collection.buffer.size" )
-    @Default( "10" )
+    @Key("collection.buffer.size")
+    @Default("10")
     int getBufferSize();
+
+
+    /**
+     * The size of threads to have in the task pool
+     */
+    @Key( "collection.task.pool.threadsize" )
+    @Default( "20" )
+    int getTaskPoolThreadSize();
+
+
+
+    /**
+     * The size of threads to have in the task pool
+     */
+    @Key( "collection.task.pool.queuesize" )
+    @Default( "20" )
+    int getTaskPoolQueueSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..8bd5a9f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.Singleton;
 import com.netflix.astyanax.AstyanaxConfiguration;
 import com.netflix.astyanax.AstyanaxContext;
 import com.netflix.astyanax.Keyspace;
@@ -41,6 +42,7 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
  *
  * @author tnine
  */
+@Singleton
 public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
     private final CassandraFig cassandraFig;
     private final CassandraConfig cassandraConfig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..5f461bb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -63,4 +63,6 @@ public class CommonModule extends AbstractModule {
 
 
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8ba73a0..8184937 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -53,7 +53,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> void submit( final Task<V, I> task ) {
+    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
 
         final ListenableFuture<V> future;
 
@@ -67,8 +67,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
-            return;
+            return Futures.immediateCancelledFuture();
         }
+
+        return future;
     }
 
 
@@ -147,16 +149,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-
-            //            ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
-            //
-            //            future.
-            //            final Task<?, ?> task = ( Task<?, ?> ) r;
             LOG.warn( "Audit queue full, rejecting audit task {}", r );
 
             throw new RejectedExecutionException( "Unable to run task, queue full" );
-            //            LOG.warn( "Audit queue full, rejecting audit task {}", task );
-            //            task.rejected();
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index e60da83..b5491bc 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -10,5 +10,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> void submit(Task<V, I> task);
+    public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 0a6ecfa..894e74a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -90,6 +90,8 @@ public interface GraphFig extends GuicyFig {
     public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
+
+
     @Default("1000")
     @Key(SCAN_PAGE_SIZE)
     int getScanPageSize();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
 import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -62,7 +64,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
 
@@ -144,6 +149,16 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
         migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
     }
+
+
+    @Inject
+    @Singleton
+    @Provides
+    public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
+    }
+
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 5076424..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -30,7 +30,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
@@ -46,6 +45,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -68,8 +69,6 @@ import com.google.common.hash.PrimitiveSink;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -91,7 +90,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
 
 
-    private final ListeningExecutorService executorService;
+    private final TaskExecutor taskExecutor;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final NodeShardAllocation nodeShardAllocation;
@@ -110,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                                      final NodeShardAllocation nodeShardAllocation,
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
-                                     final EdgeShardSerialization edgeShardSerialization ) {
+                                     final EdgeShardSerialization edgeShardSerialization,
+                                     final TaskExecutor taskExecutor ) {
 
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -124,8 +124,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
-        executorService = MoreExecutors.listeningDecorator(
-                new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
+        this.taskExecutor = taskExecutor;
     }
 
 
@@ -232,7 +231,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
 
         /**
-         * We didn't move anything this pass, mark the shard as compacted.  If we move something, it means that we missed it on the first pass
+         * We didn't move anything this pass, mark the shard as compacted.  If we move something,
+         * it means that we missed it on the first pass
          * or someone is still not writing to the target shard only.
          */
         if ( edgeCount == 0 ) {
@@ -293,91 +293,153 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<AuditResult>() {
             @Override
-            public AuditResult call() throws Exception {
+            public void onSuccess( @Nullable final AuditResult result ) {
+                LOG.debug( "Successfully completed audit of task {}", result );
+            }
 
 
-                /**
-                 * We don't have a compaction pending.  Run an audit on the shards
-                 */
-                if ( !group.isCompactionPending() ) {
+            @Override
+            public void onFailure( final Throwable t ) {
+                LOG.error( "Unable to perform audit.  Exception is ", t );
+            }
+        } );
 
-                    /**
-                     * Check if we should allocate, we may want to
-                     */
+        return future;
+    }
 
-                    /**
-                     * It's already compacting, don't do anything
-                     */
-                    if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
-                        return AuditResult.CHECKED_NO_OP;
-                    }
 
-                    try {
+    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
 
-                        final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
-                        if ( !created ) {
-                            return AuditResult.CHECKED_NO_OP;
-                        }
-                    }
-                    finally {
-                        shardAuditTaskTracker.complete( scope, edgeMeta, group );
-                    }
+        private final ApplicationScope scope;
+        private final DirectedEdgeMeta edgeMeta;
+        private final ShardEntryGroup group;
 
 
-                    return AuditResult.CHECKED_CREATED;
-                }
+        public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
+            this.scope = scope;
+            this.edgeMeta = edgeMeta;
+            this.group = group;
+        }
+
+
+        @Override
+        public ShardAuditKey getId() {
+            return new ShardAuditKey( scope, edgeMeta, group );
+        }
+
+
+        @Override
+        public void exceptionThrown( final Throwable throwable ) {
+            LOG.error( "Unable to execute audit for shard of {}", throwable );
+        }
+
+
+        @Override
+        public void rejected() {
+            //ignore, if this happens we don't care, we're saturated, we can check later
+            LOG.error( "Rejected audit for shard of {}", getId() );
+        }
 
-                //check our taskmanager
 
+        @Override
+        public AuditResult call() throws Exception {
+            /**
+             * We don't have a compaction pending.  Run an audit on the shards
+             */
+            if ( !group.isCompactionPending() ) {
 
                 /**
-                 * Do the compaction
+                 * Check if we should allocate, we may want to
                  */
-                if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
-                    /**
-                     * It's already compacting, don't do anything
-                     */
-                    if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
-                        return AuditResult.COMPACTING;
-                    }
 
-                    /**
-                     * We use a finally b/c we always want to remove the task track
-                     */
-                    try {
-                        CompactionResult result = compact( scope, edgeMeta, group );
-                        LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
-                    }
-                    finally {
-                        shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                /**
+                 * It's already compacting, don't do anything
+                 */
+                if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                    return AuditResult.CHECKED_NO_OP;
+                }
+
+                try {
+
+                    final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+                    if ( !created ) {
+                        return AuditResult.CHECKED_NO_OP;
                     }
-                    return AuditResult.COMPACTED;
+                }
+                finally {
+                    shardAuditTaskTracker.complete( scope, edgeMeta, group );
                 }
 
-                //no op, there's nothing we need to do to this shard
-                return AuditResult.NOT_CHECKED;
-            }
-        } );
 
-        /**
-         * Log our success or failures for debugging purposes
-         */
-        Futures.addCallback( future, new FutureCallback<AuditResult>() {
-            @Override
-            public void onSuccess( @Nullable final AuditResult result ) {
-                LOG.debug( "Successfully completed audit of task {}", result );
+                return AuditResult.CHECKED_CREATED;
             }
 
+            //check our taskmanager
 
-            @Override
-            public void onFailure( final Throwable t ) {
-                LOG.error( "Unable to perform audit.  Exception is ", t );
+
+            /**
+             * Do the compaction
+             */
+            if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+                /**
+                 * It's already compacting, don't do anything
+                 */
+                if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+                    return AuditResult.COMPACTING;
+                }
+
+                /**
+                 * We use a finally b/c we always want to remove the task track
+                 */
+                try {
+                    CompactionResult result = compact( scope, edgeMeta, group );
+                    LOG.info(
+                            "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+                                    "{} is {}",
+                            new Object[] { scope, edgeMeta, group, result } );
+                }
+                finally {
+                    shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+                }
+                return AuditResult.COMPACTED;
             }
-        } );
 
-        return future;
+            //no op, there's nothing we need to do to this shard
+            return AuditResult.NOT_CHECKED;
+        }
+    }
+
+
+    private static final class ShardAuditKey {
+        private final ApplicationScope scope;
+        private final DirectedEdgeMeta edgeMeta;
+        private final ShardEntryGroup group;
+
+
+        private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+                               final ShardEntryGroup group ) {
+            this.scope = scope;
+            this.edgeMeta = edgeMeta;
+            this.group = group;
+        }
+
+
+        @Override
+        public String toString() {
+            return "ShardAuditKey{" +
+                    "scope=" + scope +
+                    ", edgeMeta=" + edgeMeta +
+                    ", group=" + group +
+                    '}';
+        }
     }
 
 
@@ -531,7 +593,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-
     public static final class CompactionResult {
 
         public final long copiedEdges;
@@ -541,7 +602,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         public final Shard compactedShard;
 
 
-
         private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
                                   final Set<Shard> removedShards, final Shard compactedShard ) {
             this.copiedEdges = copiedEdges;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 1513e85..9f0792d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -35,6 +35,7 @@ import org.mockito.Matchers;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
@@ -84,6 +85,8 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
+        final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -100,7 +103,7 @@ public class ShardGroupCompactionTest {
 
         ShardGroupCompactionImpl compaction =
                 new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization );
+                        edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor );
 
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );


[05/15] git commit: Added tests for the log entry iterator

Posted by to...@apache.org.
Added tests for the log entry iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/018dbeeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/018dbeeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/018dbeeb

Branch: refs/heads/two-dot-o
Commit: 018dbeeb8ad46bf536132120f78736fa2b0dd367
Parents: dc3f448
Author: Todd Nine <to...@apache.org>
Authored: Fri Sep 26 11:21:45 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Fri Sep 26 16:21:18 2014 -0600

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTask.java          |  52 +++++--
 .../serialization/impl/LogEntryIterator.java    | 111 ++++++++++++++
 .../serialization/impl/VersionIterator.java     | 111 --------------
 .../impl/EntityVersionCleanupTaskTest.java      |  40 +++++
 .../impl/LogEntryIteratorTest.java              | 131 ++++++++++++++++
 .../collection/util/LogEntryMock.java           | 152 +++++++++++++++++++
 .../core/task/NamedTaskExecutorImpl.java        | 131 +++-------------
 .../usergrid/persistence/core/task/Task.java    |  42 +++++
 8 files changed, 539 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 85afce6..29ca3ac 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,8 +1,11 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -11,8 +14,10 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
-import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
 import org.apache.usergrid.persistence.core.entity.EntityVersion;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -22,7 +27,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
  * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
  * retained, the range is exclusive.
  */
-class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+public class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
 
@@ -33,15 +38,19 @@ class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, Co
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
 
+    private final SerializationFig serializationFig;
+
 
     private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                                       final MvccEntitySerializationStrategy entitySerializationStrategy,
                                       final CollectionIoEvent<EntityVersion> collectionIoEvent,
-                                      final List<EntityVersionDeleted> listeners ) {
+                                      final List<EntityVersionDeleted> listeners,
+                                      final SerializationFig serializationFig ) {
         this.collectionIoEvent = collectionIoEvent;
         this.listeners = listeners;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.serializationFig = serializationFig;
     }
 
 
@@ -79,34 +88,51 @@ class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, Co
         final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
 
 
-        VersionIterator versionIterator =
-                new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 );
-
+        LogEntryIterator logEntryIterator =
+                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, serializationFig.getHistorySize() );
 
-        UUID currentVersion = null;
 
         //for every entry, we want to clean it up with listeners
 
-        while ( versionIterator.hasNext() ) {
+        while ( logEntryIterator.hasNext() ) {
+
+            final MvccLogEntry logEntry = logEntryIterator.next();
 
-            currentVersion = versionIterator.next();
+
+            final UUID version = logEntry.getVersion();
+            List<ForkJoinTask<Void>> tasks = new ArrayList<>();
 
 
             //execute all the listeners
-            for ( EntityVersionDeleted listener : listeners ) {
-                listener.versionDeleted( scope, entityId, currentVersion );
+            for (final  EntityVersionDeleted listener : listeners ) {
+
+                tasks.add( new RecursiveTask<Void>() {
+                    @Override
+                    protected Void compute() {
+                        listener.versionDeleted( scope, entityId, version );
+                        return null;
+                    }
+                }.fork() );
+
+
             }
 
+            //wait for them to complete
+
+            joinAll(tasks);
+
             //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
             //after every successful invocation of listeners and entity removal
-            entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+            entitySerializationStrategy.delete( scope, entityId, version ).execute();
 
-            logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
         }
 
 
         return collectionIoEvent;
     }
+
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
new file mode 100644
index 0000000..53eb6e3
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ *
+ */
+public class LogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final CollectionScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+
+
+    /**
+     *
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version < max
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                             final CollectionScope scope, final Id entityId, final UUID maxVersion,
+                             final int pageSize ) {
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.nextStart = maxVersion;
+        this.pageSize = pageSize;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public MvccLogEntry next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize = pageSize + 1;
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+            results.remove( 0 );
+        }
+
+
+        //we have results, set our next start
+        if ( results.size() == pageSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
deleted file mode 100644
index 323f12d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- *
- */
-public class VersionIterator implements Iterator<UUID> {
-
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final CollectionScope scope;
-    private final Id entityId;
-    private final int pageSize;
-
-
-    private Iterator<MvccLogEntry> elementItr;
-    private UUID nextStart;
-
-
-    /**
-     *
-     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
-     * @param scope The scope of the entity
-     * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version < max
-     * @param pageSize The fetch size to get when querying the serialization strategy
-     */
-    public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                            final CollectionScope scope, final Id entityId, final UUID maxVersion,
-                            final int pageSize ) {
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.scope = scope;
-        this.entityId = entityId;
-        this.nextStart = maxVersion;
-        this.pageSize = pageSize;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
-            try {
-                advance();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to query cassandra", e );
-            }
-        }
-
-        return elementItr.hasNext();
-    }
-
-
-    @Override
-    public UUID next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more elements exist" );
-        }
-
-        return elementItr.next().getVersion();
-    }
-
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException( "Remove is unsupported" );
-    }
-
-
-    /**
-     * Advance our iterator
-     */
-    public void advance() throws ConnectionException {
-
-        final int requestedSize = pageSize + 1;
-
-        //loop through even entry that's < this one and remove it
-        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
-
-        //we always remove the first version if it's equal since it's returned
-        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
-            results.remove( 0 );
-        }
-
-
-        //we have results, set our next start
-        if ( results.size() == requestedSize ) {
-            nextStart = results.get( results.size() - 1 ).getVersion();
-        }
-        //nothing left to do
-        else {
-            nextStart = null;
-        }
-
-        elementItr = results.iterator();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
new file mode 100644
index 0000000..050ea9e
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Cleanup task tests
+ */
+public class EntityVersionCleanupTaskTest {
+
+    @Test
+    public void multiPageTask(){
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
new file mode 100644
index 0000000..9ee284b
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIteratorTest.java
@@ -0,0 +1,131 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests iterator paging
+ */
+public class LogEntryIteratorTest {
+
+
+    @Test
+    public void empty() throws ConnectionException {
+
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner" ), "entities" );
+
+        final Id entityId = new SimpleId( "entity" );
+
+        final int pageSize = 100;
+
+
+        //set the start version, it should be discarded
+        UUID start = UUIDGenerator.newTimeUUID();
+
+        when( logEntrySerializationStrategy.load( same( scope ), same( entityId ), same( start ), same( pageSize ) ) )
+                .thenReturn( new ArrayList<MvccLogEntry>() );
+
+
+        //now iterate we should get everything
+        LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+        assertFalse( itr.hasNext() );
+    }
+
+
+    @Test
+    public void partialLastPage() throws ConnectionException {
+
+
+        final int pageSize = 10;
+        final int totalPages = 3;
+        final int lastPageSize = pageSize / 2;
+
+        //have one half page
+
+        pageElements( pageSize, totalPages, lastPageSize );
+    }
+
+
+    @Test
+    public void emptyLastPage() throws ConnectionException {
+
+
+        final int pageSize = 10;
+        final int totalPages = 3;
+        final int lastPageSize = 0;
+
+        //have one half page
+
+        pageElements( pageSize, totalPages, lastPageSize );
+    }
+
+
+    public void pageElements( final int pageSize, final int totalPages, final int lastPageSize )
+            throws ConnectionException {
+
+        final MvccLogEntrySerializationStrategy logEntrySerializationStrategy =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final CollectionScope scope =
+                new CollectionScopeImpl( new SimpleId( "application" ), new SimpleId( "owner" ), "entities" );
+
+        final Id entityId = new SimpleId( "entity" );
+
+
+        //have one half page
+        final int toGenerate = pageSize * totalPages + lastPageSize;
+
+
+        final LogEntryMock mockResults =
+                LogEntryMock.createLogEntryMock( logEntrySerializationStrategy, scope, entityId, toGenerate );
+
+        Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
+
+        //this element should be skipped
+        UUID start = expectedEntries.next().getVersion();
+
+        //now iterate we should get everything
+        LogEntryIterator itr = new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+
+
+        while ( expectedEntries.hasNext() && itr.hasNext() ) {
+            final MvccLogEntry expected = expectedEntries.next();
+
+            final MvccLogEntry returned = itr.next();
+
+            assertEquals( expected, returned );
+        }
+
+
+        assertFalse( itr.hasNext() );
+        assertFalse( expectedEntries.hasNext() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
new file mode 100644
index 0000000..a25bc94
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -0,0 +1,152 @@
+package org.apache.usergrid.persistence.collection.util;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Utility for constructing representative log entries for mock serialziation from high to low
+ */
+public class LogEntryMock {
+
+
+    private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+
+    private final Id entityId;
+
+
+    /**
+     * Create a mock list of versions of the specified size
+     *
+     * @param entityId The entity Id to use
+     * @param size The size to use
+     */
+    private LogEntryMock(final Id entityId, final int size ) {
+
+        this.entityId = entityId;
+
+        for ( int i = 0; i < size; i++ ) {
+
+            final UUID version = UUIDGenerator.newTimeUUID();
+
+            entries.put( version,
+                    new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) );
+        }
+    }
+
+
+    /**
+     * Init the mock with the given data structure
+     * @param logEntrySerializationStrategy The strategy to moc
+     * @param scope
+     * @throws ConnectionException
+     */
+    private void initMock(  final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final  CollectionScope scope )
+
+            throws ConnectionException {
+
+        //wire up the mocks
+        when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class)  )).thenAnswer( new Answer<List<MvccLogEntry>>() {
+
+
+            @Override
+            public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable {
+                final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+                final int count = (Integer)invocation.getArguments()[3];
+
+                final List<MvccLogEntry> results = new ArrayList<>( count );
+
+                final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator();
+
+                for(int i = 0; i < count && itr.hasNext(); i ++){
+                    results.add( itr.next() );
+                }
+
+
+                return results;
+            }
+        } );
+    }
+
+
+    /**
+     * Get the entries (ordered from high to low) this mock contains
+     * @return
+     */
+    public Collection<MvccLogEntry> getEntries(){
+        return entries.values();
+    }
+
+    /**
+     *
+     * @param logEntrySerializationStrategy The mock to use
+     * @param scope The scope to use
+     * @param entityId The entityId to use
+     * @param size The number of entries to mock
+     * @throws ConnectionException
+     */
+    public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final  CollectionScope scope,final Id entityId, final int size )
+
+            throws ConnectionException {
+        LogEntryMock mock = new LogEntryMock( entityId, size );
+        mock.initMock( logEntrySerializationStrategy, scope );
+
+        return mock;
+    }
+
+
+    private static final class ReversedUUIDComparator implements Comparator<UUID> {
+
+        public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
+
+
+        @Override
+        public int compare( final UUID o1, final UUID o2 ) {
+            return UUIDComparator.staticCompare( o1, o2 ) * -1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index aba8cd5..4fc72c8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.core.task;
 
 
@@ -36,7 +54,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize) {
+    public NamedTaskExecutorImpl( final String name, final int poolSize ) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
@@ -44,12 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         this.name = name;
         this.poolSize = poolSize;
 
-//        final BlockingQueue<Runnable> queue =
-//                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
-//
-//        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
-
-       this.executorService =  new NamedForkJoinPool(poolSize);
+        this.executorService = new NamedForkJoinPool( poolSize );
     }
 
 
@@ -57,126 +70,30 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     public <V, I> Task<V, I> submit( final Task<V, I> task ) {
 
         try {
-           executorService.submit( task );
+            executorService.submit( task );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
         }
 
         return task;
-
     }
 
 
-    /**
-     * Callback for when the task succeeds or fails.
-     */
-    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
-
-        private final Task<V, I> task;
-
-
-        private TaskFutureCallBack( Task<V, I> task ) {
-            this.task = task;
-        }
-
-
-        @Override
-        public void onSuccess( @Nullable final V result ) {
-            LOG.debug( "Successfully completed task ", task );
-        }
-
-
-        @Override
-        public void onFailure( final Throwable t ) {
-            LOG.error( "Unable to execute task.  Exception is ", t );
-
-            task.exceptionThrown( t );
-        }
-    }
-
-
-    private final class NamedForkJoinPool extends ForkJoinPool{
+    private final class NamedForkJoinPool extends ForkJoinPool {
 
         private NamedForkJoinPool( final int workerThreadCount ) {
             //TODO, verify the scheduler at the end
             super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
         }
-
-
-
     }
 
-    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler {
 
         @Override
         public void uncaughtException( final Thread t, final Throwable e ) {
             LOG.error( "Uncaught exception on thread {} was {}", t, e );
         }
     }
-
-
-
-
-    private final class NamedWorkerThread extends ForkJoinWorkerThread{
-
-        /**
-         * Creates a ForkJoinWorkerThread operating in the given pool.
-         *
-         * @param pool the pool this thread works in
-         *
-         * @throws NullPointerException if pool is null
-         */
-        protected NamedWorkerThread(final String name,  final ForkJoinPool pool ) {
-            super( pool );
-        }
-    }
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
-            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
-                    new RejectedHandler() );
-        }
-    }
-
-
-    /**
-     * Thread factory that will name and count threads for easier debugging
-     */
-    private final class CountingThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            Thread t = new Thread( r, name + "-" + newValue );
-
-            t.setDaemon( true );
-
-            return t;
-        }
-    }
-
-
-    /**
-     * The handler that will handle rejected executions and signal the interface
-     */
-    private final class RejectedHandler implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
-            throw new RejectedExecutionException( "Unable to run task, queue full" );
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/018dbeeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index eb04c2c..4dccc72 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,6 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveTask;
 
 
@@ -28,6 +50,26 @@ public abstract class Task<V, I> extends RecursiveTask<V> {
 
 
     /**
+     * Fork all tasks in the list except the last one.  The last will be run in the caller thread
+     * The others will wait for join
+     * @param tasks
+     *
+     */
+    public <V, T extends ForkJoinTask<V>> List<V> joinAll(List<T> tasks) throws ExecutionException, InterruptedException {
+
+        //don't fork the last one
+       List<V> results = new ArrayList<>(tasks.size());
+
+        for(T task: tasks){
+            results.add( task.join() );
+        }
+
+        return results;
+
+    }
+
+
+    /**
      * Execute the task
      */
     public abstract V executeTask() throws Exception;


[10/15] git commit: Removed stress tests that should not be part of the build

Posted by to...@apache.org.
Removed stress tests that should not be part of the build


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/324a333b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/324a333b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/324a333b

Branch: refs/heads/two-dot-o
Commit: 324a333badf673108dd2e59971f178f9af204d03
Parents: 9c63ce6
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 16:00:02 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 16:00:02 2014 -0600

----------------------------------------------------------------------
 .../persistence/collection/EntityCollectionManagerStressTest.java  | 2 ++
 .../apache/usergrid/persistence/graph/GraphManagerStressTest.java  | 1 +
 2 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/324a333b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
index 62a42ec..3f5b071 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import org.jukito.UseModules;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertNotNull;
 
 @RunWith(ITRunner.class)
 @UseModules(TestCollectionModule.class)
+@Ignore("Stress test should not be run in embedded mode")
 public class EntityCollectionManagerStressTest {
     private static final Logger log = LoggerFactory.getLogger( 
             EntityCollectionManagerStressTest.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/324a333b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
index a570b7c..aa2d027 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
@@ -58,6 +58,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(ITRunner.class)
 @UseModules(TestGraphModule.class)
+@Ignore("Stress test should not be run in embedded mode")
 public class GraphManagerStressTest {
     private static final Logger log = LoggerFactory.getLogger( GraphManagerStressTest.class );
 


[08/15] git commit: Finished updating back to executor pool

Posted by to...@apache.org.
Finished updating back to executor pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/10604788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/10604788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/10604788

Branch: refs/heads/two-dot-o
Commit: 1060478811e928c45b15ff5b96cd4b9888671d77
Parents: af3726b
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 09:20:26 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 09:20:26 2014 -0600

----------------------------------------------------------------------
 .../impl/EntityVersionCleanupTask.java          | 81 ++++++++++----------
 .../impl/EntityVersionCleanupTaskTest.java      | 55 +++++++------
 .../core/task/NamedTaskExecutorImpl.java        | 20 +++--
 .../usergrid/persistence/core/task/Task.java    | 22 +-----
 .../core/task/NamedTaskExecutorImplTest.java    | 12 +--
 .../shard/impl/ShardGroupCompactionImpl.java    | 40 ++--------
 6 files changed, 96 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 3d1483d..d7ece40 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -2,10 +2,8 @@ package org.apache.usergrid.persistence.collection.impl;
 
 
 import java.util.List;
-import java.util.Stack;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RecursiveTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -20,12 +18,17 @@ import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIte
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
 
 /**
  * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
  * retained, the range is exclusive.
  */
-public class EntityVersionCleanupTask extends Task<Void> {
+public class EntityVersionCleanupTask implements Task<Void> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
 
@@ -66,21 +69,23 @@ public class EntityVersionCleanupTask extends Task<Void> {
 
 
     @Override
-    public void rejected() {
+    public Void rejected() {
         //Our task was rejected meaning our queue was full.  We need this operation to run,
         // so we'll run it in our current thread
 
         try {
-            executeTask();
+            call();
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Exception thrown in call task", e );
         }
+
+        return null;
     }
 
 
     @Override
-    public Void executeTask() throws Exception {
+    public Void call() throws Exception {
 
 
         final UUID maxVersion = version;
@@ -117,54 +122,46 @@ public class EntityVersionCleanupTask extends Task<Void> {
 
     private void fireEvents() throws ExecutionException, InterruptedException {
 
-        if ( listeners.size() == 0 ) {
+        final int listenerSize = listeners.size();
+
+        if ( listenerSize == 0 ) {
             return;
         }
 
-        //stack to track forked tasks
-        final Stack<RecursiveTask<Void>> tasks = new Stack<>();
-
-
-        //we don't want to fork the final listener, we'll run that in our current thread
-        final int forkedTaskSize = listeners.size() - 1;
-
-
-        //execute all the listeners
-        for ( int i = 0; i < forkedTaskSize; i++ ) {
-
-            final EntityVersionDeleted listener = listeners.get( i );
-
-            final RecursiveTask<Void> task = createTask( listener );
-
-            task.fork();
-
-            tasks.push( task );
+        if ( listenerSize == 1 ) {
+            listeners.get( 0 ).versionDeleted( scope, entityId, version );
+            return;
         }
 
+        LOG.debug( "Started firing {} listeners", listenerSize );
 
-        final RecursiveTask<Void> lastTask = createTask( listeners.get( forkedTaskSize ) );
+        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
+        Observable.from( listeners )
+                  .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
 
-        lastTask.invoke();
+                      @Override
+                      public Observable<EntityVersionDeleted> call(
+                              final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
 
+                          return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
+                              @Override
+                              public void call( final EntityVersionDeleted listener ) {
+                                  listener.versionDeleted( scope, entityId, version );
+                              }
+                          } );
+                      }
+                  }, Schedulers.io() ).toBlocking().last();
 
-        //wait for them to complete
-        while ( !tasks.isEmpty() ) {
-            tasks.pop().get();
-        }
+        LOG.debug( "Finished firing {} listeners", listenerSize );
     }
 
 
-    /**
-     * Return the new task to execute
-     */
-    private RecursiveTask<Void> createTask( final EntityVersionDeleted listener ) {
-        return new RecursiveTask<Void>() {
-            @Override
-            protected Void compute() {
-                listener.versionDeleted( scope, entityId, version );
-                return null;
-            }
-        };
+    private static interface ListenerRunner {
+
+        /**
+         * Run the listeners
+         */
+        public void runListeners();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index 2df75f1..a34b4f8 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection.impl;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
 import org.junit.AfterClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -40,6 +42,7 @@ import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -57,7 +60,7 @@ import static org.mockito.Mockito.when;
  */
 public class EntityVersionCleanupTaskTest {
 
-    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4 );
+    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
 
 
     @AfterClass
@@ -119,10 +122,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //verify it was run
         verify( firstBatch ).execute();
@@ -187,10 +190,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //verify it was run
         verify( firstBatch, never() ).execute();
@@ -260,10 +263,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //we deleted the version
         //verify it was run
@@ -343,10 +346,10 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //we deleted the version
         //verify we deleted everything
@@ -440,18 +443,18 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( cleanupTask );
+        ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
 
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!cleanupTask.isDone()) {
+        while(!future.isDone()) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
 
         //wait for the task
-        cleanupTask.get();
+        future.get();
 
         //we deleted the version
         //verify we deleted everything
@@ -479,7 +482,7 @@ public class EntityVersionCleanupTaskTest {
         /**
          * only 1 thread on purpose, we want to saturate the task
          */
-        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1);
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
@@ -496,17 +499,16 @@ public class EntityVersionCleanupTaskTest {
         final int sizeToReturn = 10;
 
 
-        final int listenerCount = 1;
+        final int listenerCount = 2;
 
         final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
 
-        final SlowListener listener1 = new SlowListener( latch, waitSemaphore );
+        final SlowListener slowListener = new SlowListener( latch, waitSemaphore );
+        final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
 
-        final List<EntityVersionDeleted> listeners = new ArrayList<>();
 
-        listeners.add( listener1 );
 
         final Id applicationId = new SimpleId( "application" );
 
@@ -526,11 +528,16 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask firstTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener), appScope, entityId, version );
+
+
+
+        //change the listeners to one that is just invoked quickly
+
 
         EntityVersionCleanupTask secondTask =
                       new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                              mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                              mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener), appScope, entityId, version );
 
 
         final MutationBatch firstBatch = mock( MutationBatch.class );
@@ -548,24 +555,24 @@ public class EntityVersionCleanupTaskTest {
 
 
         //start the task
-        taskExecutor.submit( firstTask );
+        ListenableFuture<Void> future1 =  taskExecutor.submit( firstTask );
 
         //now start another task while the slow running task is running
-        taskExecutor.submit( secondTask );
+        ListenableFuture<Void> future2 =  taskExecutor.submit( secondTask );
 
         //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
-        secondTask.get();
+        future2.get();
 
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!firstTask.isDone()) {
+        while(!future1.isDone()) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
 
         //wait for the task
-        firstTask.get();
+        future1.get();
 
         //we deleted the version
         //verify we deleted everything

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index d4cc915..b18687a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,6 +1,7 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
@@ -61,7 +62,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
 
     @Override
-    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
+    public <V> ListenableFuture<V> submit( final Task<V> task ) {
 
         final ListenableFuture<V> future;
 
@@ -71,26 +72,31 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
             /**
              * Log our success or failures for debugging purposes
              */
-            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+            Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
         }
         catch ( RejectedExecutionException ree ) {
-            task.rejected();
-            return Futures.immediateCancelledFuture();
+            return Futures.immediateFuture( task.rejected());
         }
 
         return future;
     }
 
 
+    @Override
+    public void shutdown() {
+        this.executorService.shutdownNow();
+    }
+
+
     /**
      * Callback for when the task succeeds or fails.
      */
-    private static final class TaskFutureCallBack<V, I> implements FutureCallback<V> {
+    private static final class TaskFutureCallBack<V> implements FutureCallback<V> {
 
-        private final Task<V, I> task;
+        private final Task<V> task;
 
 
-        private TaskFutureCallBack( Task<V, I> task ) {
+        private TaskFutureCallBack( Task<V> task ) {
             this.task = task;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 9a0b857..5890627 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,30 +1,14 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public interface Task<V, I> extends Callable<V> {
-
-    /**
-     * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
-     */
-    public abstract I getId();
-
-
-    @Override
-    protected V compute() {
-        try {
-            return executeTask();
-        }
-        catch ( Exception e ) {
-            exceptionThrown( e );
-            throw new RuntimeException( e );
-        }
-    }
+public interface Task<V> extends Callable<V> {
 
 
     /**
@@ -40,7 +24,7 @@ public interface Task<V, I> extends Callable<V> {
      * request and process later (lazy repair for instance ) do so.
      *
      */
-    void rejected();
+    V rejected();
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index fa63eee..34f57e5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -29,7 +29,7 @@ public class NamedTaskExecutorImplTest {
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        final Task<Void, UUID> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
+        final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
 
         executor.submit( task );
 
@@ -185,7 +185,7 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    private static abstract class TestTask<V> implements Task<V, UUID> {
+    private static abstract class TestTask<V> implements Task<V> {
 
         private final List<Throwable> exceptions;
         private final CountDownLatch exceptionLatch;
@@ -203,11 +203,6 @@ public class NamedTaskExecutorImplTest {
         }
 
 
-        @Override
-        public UUID getId() {
-            return UUIDGenerator.newTimeUUID();
-        }
-
 
         @Override
         public void exceptionThrown( final Throwable throwable ) {
@@ -217,8 +212,9 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public void rejected() {
+        public V rejected() {
             rejectedLatch.countDown();
+            return null;
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/10604788/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..6cd98a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -315,7 +315,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
+    private final class ShardAuditTask implements Task<AuditResult> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -329,23 +329,18 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             this.group = group;
         }
 
-
-        @Override
-        public ShardAuditKey getId() {
-            return new ShardAuditKey( scope, edgeMeta, group );
-        }
-
-
-        @Override
+         @Override
         public void exceptionThrown( final Throwable throwable ) {
             LOG.error( "Unable to execute audit for shard of {}", throwable );
         }
 
 
         @Override
-        public void rejected() {
+        public AuditResult rejected() {
             //ignore, if this happens we don't care, we're saturated, we can check later
-            LOG.error( "Rejected audit for shard of {}", getId() );
+            LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group );
+
+            return AuditResult.NOT_CHECKED;
         }
 
 
@@ -418,29 +413,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    private static final class ShardAuditKey {
-        private final ApplicationScope scope;
-        private final DirectedEdgeMeta edgeMeta;
-        private final ShardEntryGroup group;
-
-
-        private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
-                               final ShardEntryGroup group ) {
-            this.scope = scope;
-            this.edgeMeta = edgeMeta;
-            this.group = group;
-        }
-
-
-        @Override
-        public String toString() {
-            return "ShardAuditKey{" +
-                    "scope=" + scope +
-                    ", edgeMeta=" + edgeMeta +
-                    ", group=" + group +
-                    '}';
-        }
-    }
 
 
     /**


[14/15] git commit: Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into eventsystem

Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into eventsystem


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d85b97c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d85b97c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d85b97c4

Branch: refs/heads/two-dot-o
Commit: d85b97c423f31afda0e846a9071e03efe0505d26
Parents: 8cadba2 abbd76e
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:30:54 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:30:54 2014 -0600

----------------------------------------------------------------------
 .../usergrid/mq/cassandra/QueueManagerImpl.java |  18 +-
 .../mq/cassandra/io/AbstractSearch.java         | 199 ++++++++++++-------
 .../mq/cassandra/io/ConsumerTransaction.java    |  19 +-
 .../services/notifications/QueueListener.java   |   4 +-
 4 files changed, 153 insertions(+), 87 deletions(-)
----------------------------------------------------------------------



[12/15] git commit: Merge branch 'two-dot-o' into eventsystem

Posted by to...@apache.org.
Merge branch 'two-dot-o' into eventsystem


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1a416245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1a416245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1a416245

Branch: refs/heads/two-dot-o
Commit: 1a4162459ae8c31a489937d8a5d2f76f5ca6c2e2
Parents: 429b12b d6fd7dd
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 10:25:44 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 10:25:44 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  6 ++--
 .../index/impl/EsEntityIndexImpl.java           | 13 ++++++++
 .../persistence/index/impl/EsQueryVistor.java   |  3 +-
 .../java/org/apache/usergrid/rest/BasicIT.java  | 34 ++++++++++++++------
 .../notifications/ApplicationQueueManager.java  |  2 +-
 .../services/notifications/QueueListener.java   |  6 ++--
 6 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[03/15] git commit: Added events and task cleanup

Posted by to...@apache.org.
Added events and task cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c0b78b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c0b78b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c0b78b9f

Branch: refs/heads/two-dot-o
Commit: c0b78b9fab07cbc2f5975d89664ff8a523b98b9a
Parents: 4c72f5c
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 12:09:11 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 12:09:11 2014 -0600

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |  12 +-
 .../collection/event/EntityDeleted.java         |  25 ++++
 .../collection/event/EntityVersionCreated.java  |  25 ++++
 .../collection/event/EntityVersionDeleted.java  |  28 +++++
 .../collection/event/EntityVersionRemoved.java  |  26 -----
 .../impl/EntityCollectionManagerImpl.java       |  12 ++
 .../impl/EntityVersionCleanupTask.java          | 115 +++++++++++++++++++
 .../serialization/impl/VersionIterator.java     | 111 ++++++++++++++++++
 .../core/task/NamedTaskExecutorImpl.java        |  26 +++--
 .../usergrid/persistence/core/task/Task.java    |   2 +-
 .../persistence/core/task/TaskExecutor.java     |   5 +-
 .../persistence/graph/event/EdgeDeleted.java    |   8 ++
 12 files changed, 349 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 84bcea6..ee3a5d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -27,14 +27,14 @@ import rx.Observable;
 
 /**
  *
- *
- * @author: tnine
+ * The operations for performing changes on an entity
  *
  */
 public interface EntityCollectionManager {
 
     /**
-     * Write the entity in the entity collection.
+     * Write the entity in the entity collection.  This is an entire entity, it's contents will
+     * completely overwrite the previous values, if it exists.
      *
      * @param entity The entity to update
      */
@@ -52,10 +52,10 @@ public interface EntityCollectionManager {
     public Observable<Entity> load( Id entityId );
 
 
-    //TODO add partial update
-
     /**
-     * Takes the change and reloads an entity with all changes applied.
+     * Takes the change and reloads an entity with all changes applied in this entity applied.
+     * The resulting entity from calling load will be the previous version of this entity + the entity
+     * in this object applied to it.
      * @param entity
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
new file mode 100644
index 0000000..1c075f1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity is deleted.  The delete log entry is not removed until all instances of this listener has completed.
+ * If any listener fails with an exception, the entity will not be removed.
+ *
+ */
+public interface EntityDeleted {
+
+
+    /**
+     * The event fired when an entity is deleted
+     *
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     */
+    public void deleted( final CollectionScope scope, final Id entityId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
new file mode 100644
index 0000000..9d7761c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ *
+ * Invoked after a new version of an entity has been created.  The entity should be a complete
+ * view of the entity.
+ *
+ */
+public interface EntityVersionCreated {
+
+
+    /**
+     * The new version of the entity.  Note that this should be a fully merged view of the entity.
+     * In the case of partial updates, the passed entity should be fully merged with it's previous entries
+     * @param scope The scope of the entity
+     * @param entity The fully loaded and merged entity
+     */
+    public void versionCreated( final CollectionScope scope, final Entity entity );
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
new file mode 100644
index 0000000..3b76e84
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -0,0 +1,28 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity version is removed.  Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionDeleted {
+
+
+    /**
+     * The version specified was removed.
+     *
+     * @param scope The scope of the entity
+     * @param entityId The entity Id that was removed
+     * @param entityVersion The version that was removed
+     */
+    public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
deleted file mode 100644
index dca575d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-
-
-/**
- *
- * Invoked when an entity version is removed.  Note that this is not a deletion of the entity itself,
- * only the version itself.
- *
- */
-public interface EntityVersionRemoved {
-
-
-    /**
-     * The version specified was removed.
-     * @param scope
-     * @param entityId
-     * @param entityVersion
-     */
-    public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 96a7ed2..a6fc31b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -19,11 +19,16 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
+import java.util.List;
+import java.util.UUID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -38,6 +43,8 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimist
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -83,6 +90,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MarkStart markStart;
     private final MarkCommit markCommit;
 
+    private final TaskExecutor taskExecutor;
+
     @Inject
     public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
                                         @WriteUpdate final WriteStart writeUpdate,
@@ -90,6 +99,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                                         final WriteOptimisticVerify writeOptimisticVerify,
                                         final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
                                         final MarkStart markStart, final MarkCommit markCommit,
+                                        final TaskExecutor taskExecutor,
                                         @Assisted final CollectionScope collectionScope) {
 
         Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
@@ -109,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.uuidService = uuidService;
         this.collectionScope = collectionScope;
+        this.taskExecutor = taskExecutor;
     }
 
 
@@ -240,4 +251,5 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
new file mode 100644
index 0000000..11e2da9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -0,0 +1,115 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.core.entity.EntityVersion;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
+ * retained, the range is exclusive.
+ */
+class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
+
+
+    private final CollectionIoEvent<EntityVersion> collectionIoEvent;
+    private final List<EntityVersionDeleted> listeners;
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                      final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                      final CollectionIoEvent<EntityVersion> collectionIoEvent,
+                                      final List<EntityVersionDeleted> listeners ) {
+        this.collectionIoEvent = collectionIoEvent;
+        this.listeners = listeners;
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public CollectionIoEvent<EntityVersion> getId() {
+        return collectionIoEvent;
+    }
+
+
+    @Override
+    public void exceptionThrown( final Throwable throwable ) {
+        LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+    }
+
+
+    @Override
+    public void rejected() {
+        //Our task was rejected meaning our queue was full.  We need this operation to run,
+        // so we'll run it in our current thread
+
+        try {
+            call();
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Exception thrown in call task", e );
+        }
+    }
+
+
+    @Override
+    public CollectionIoEvent<EntityVersion> call() throws Exception {
+
+        final CollectionScope scope = collectionIoEvent.getEntityCollection();
+        final Id entityId = collectionIoEvent.getEvent().getId();
+        final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+
+        VersionIterator versionIterator =
+                new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 );
+
+
+        UUID currentVersion = null;
+
+        //for every entry, we want to clean it up with listeners
+
+        while ( versionIterator.hasNext() ) {
+
+            currentVersion = versionIterator.next();
+
+
+            //execute all the listeners
+            for ( EntityVersionDeleted listener : listeners ) {
+                listener.versionDeleted( scope, entityId, currentVersion );
+            }
+
+            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
+            //after every successful invocation of listeners and entity removal
+            entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+            logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+
+        }
+
+
+        return collectionIoEvent;
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
new file mode 100644
index 0000000..323f12d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ *
+ */
+public class VersionIterator implements Iterator<UUID> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final CollectionScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+
+
+    /**
+     *
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version < max
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                            final CollectionScope scope, final Id entityId, final UUID maxVersion,
+                            final int pageSize ) {
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.nextStart = maxVersion;
+        this.pageSize = pageSize;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public UUID next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next().getVersion();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize = pageSize + 1;
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+            results.remove( 0 );
+        }
+
+
+        //we have results, set our next start
+        if ( results.size() == requestedSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8184937..40ee5cf 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -32,6 +32,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private final ListeningExecutorService executorService;
 
+    private final String name;
+    private final int poolSize;
+    private final int queueLength;
+
 
     /**
      * @param name The name of this instance of the task executor
@@ -44,11 +48,14 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
         Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
+        this.name = name;
+        this.poolSize = poolSize;
+        this.queueLength = queueLength;
 
         final BlockingQueue<Runnable> queue =
                 queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
 
-        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
     }
 
 
@@ -105,11 +112,11 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */
-    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
 
-        public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
 
-            super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
                     new RejectedHandler() );
         }
     }
@@ -118,15 +125,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     /**
      * Thread factory that will name and count threads for easier debugging
      */
-    private static final class CountingThreadFactory implements ThreadFactory {
+    private final class CountingThreadFactory implements ThreadFactory {
 
         private final AtomicLong threadCounter = new AtomicLong();
 
-        private final String name;
-
-
-        private CountingThreadFactory( final String name ) {this.name = name;}
-
 
         @Override
         public Thread newThread( final Runnable r ) {
@@ -144,12 +146,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     /**
      * The handler that will handle rejected executions and signal the interface
      */
-    private static final class RejectedHandler implements RejectedExecutionHandler {
+    private final class RejectedHandler implements RejectedExecutionHandler {
 
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            LOG.warn( "Audit queue full, rejecting audit task {}", r );
+            LOG.warn( "{} task queue full, rejecting task {}", name, r );
 
             throw new RejectedExecutionException( "Unable to run task, queue full" );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 8b1ed22..518b461 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -27,7 +27,7 @@ public interface Task<V, I> extends Callable<V> {
      * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
      * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
      * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
-     * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+     * request and process later (lazy repair for instance ) do so.
      *
      */
     void rejected();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index b5491bc..619ac14 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -1,6 +1,9 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import com.google.common.util.concurrent.ListenableFuture;
+
+
 /**
  * An interface for execution of tasks
  */
@@ -10,5 +13,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
+    public <V, I> ListenableFuture<V> submit( Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
new file mode 100644
index 0000000..631de59
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
@@ -0,0 +1,8 @@
+package org.apache.usergrid.persistence.graph.event;
+
+
+/**
+ *
+ *
+ */
+public interface EdgeDeleted {}


[11/15] git commit: Resolved conflicts when guice modules are integrated

Posted by to...@apache.org.
Resolved conflicts when guice modules are integrated


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/429b12b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/429b12b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/429b12b4

Branch: refs/heads/two-dot-o
Commit: 429b12b4f5da4af4e8ce868f0042897b1f73a52a
Parents: 324a333
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 18:06:43 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 18:06:43 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |  1 +
 .../guice/CollectionTaskExecutor.java           | 35 ++++++++++++++++++++
 .../impl/EntityCollectionManagerImpl.java       |  2 ++
 .../persistence/graph/guice/GraphModule.java    |  1 +
 .../graph/guice/GraphTaskExecutor.java          | 33 ++++++++++++++++++
 .../impl/shard/impl/NodeShardCacheImpl.java     |  5 ---
 .../shard/impl/ShardGroupCompactionImpl.java    |  3 +-
 7 files changed, 74 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 306f6e0..9d949e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -100,6 +100,7 @@ public class CollectionModule extends AbstractModule {
     @Inject
     @Singleton
     @Provides
+    @CollectionTaskExecutor
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
         return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
new file mode 100644
index 0000000..7c08437
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
@@ -0,0 +1,35 @@
+package org.apache.usergrid.persistence.collection.guice;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CollectionTaskExecutor {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index a6fc31b..094baa6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -29,6 +29,7 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -99,6 +100,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                                         final WriteOptimisticVerify writeOptimisticVerify,
                                         final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
                                         final MarkStart markStart, final MarkCommit markCommit,
+                                        @CollectionTaskExecutor
                                         final TaskExecutor taskExecutor,
                                         @Assisted final CollectionScope collectionScope) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 608f8ce..9538be0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -154,6 +154,7 @@ public class GraphModule extends AbstractModule {
     @Inject
     @Singleton
     @Provides
+    @GraphTaskExecutor
     public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
         return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java
new file mode 100644
index 0000000..fc54598
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphTaskExecutor.java
@@ -0,0 +1,33 @@
+package org.apache.usergrid.persistence.graph.guice;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface GraphTaskExecutor {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index a2c1aeb..c5f0bfb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -62,11 +62,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 
-import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/429b12b4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 6cd98a7..9f8efc8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -50,6 +50,7 @@ import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.guice.GraphTaskExecutor;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -110,7 +111,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
                                      final EdgeShardSerialization edgeShardSerialization,
-                                     final TaskExecutor taskExecutor ) {
+                                     @GraphTaskExecutor final TaskExecutor taskExecutor ) {
 
         this.timeService = timeService;
         this.graphFig = graphFig;


[04/15] git commit: Converted tasks to ForkJoin tasks to allow for parallel execution.

Posted by to...@apache.org.
Converted tasks to ForkJoin tasks to allow for parallel execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dc3f448c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dc3f448c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dc3f448c

Branch: refs/heads/two-dot-o
Commit: dc3f448c946219c44e3dbfadb294fea0b8048343
Parents: c0b78b9
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 14:56:23 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 14:56:23 2014 -0600

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   2 +-
 .../impl/EntityVersionCleanupTask.java          |   8 +-
 .../persistence/core/task/ImmediateTask.java    |  42 +++++++
 .../core/task/NamedTaskExecutorImpl.java        |  74 ++++++++-----
 .../usergrid/persistence/core/task/Task.java    |  44 +++++---
 .../persistence/core/task/TaskExecutor.java     |   2 +-
 .../core/task/NamedTaskExecutorImplTest.java    | 111 +++++++++++++++----
 .../persistence/graph/guice/GraphModule.java    |   2 +-
 .../impl/shard/ShardGroupCompaction.java        |   7 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  44 ++------
 10 files changed, 227 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 306f6e0..a6230e1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -101,7 +101,7 @@ public class CollectionModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 11e2da9..85afce6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -22,7 +22,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
  * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
  * retained, the range is exclusive.
  */
-class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+class EntityVersionCleanupTask extends Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
 
@@ -63,7 +63,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
         // so we'll run it in our current thread
 
         try {
-            call();
+            executeTask();
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Exception thrown in call task", e );
@@ -72,7 +72,7 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
 
 
     @Override
-    public CollectionIoEvent<EntityVersion> call() throws Exception {
+    public CollectionIoEvent<EntityVersion> executeTask() throws Exception {
 
         final CollectionScope scope = collectionIoEvent.getEntityCollection();
         final Id entityId = collectionIoEvent.getEvent().getId();
@@ -102,8 +102,6 @@ class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>,
             entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
 
             logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
-
-
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
new file mode 100644
index 0000000..627e7e8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/ImmediateTask.java
@@ -0,0 +1,42 @@
+package org.apache.usergrid.persistence.core.task;
+
+
+/**
+ * Does not perform computation, just returns the value passed to it
+ *
+ */
+public class ImmediateTask<V, I> extends Task<V, I> {
+
+    private final I id;
+    private final V returned;
+
+
+    protected ImmediateTask( final I id, final V returned ) {
+        this.id = id;
+        this.returned = returned;
+    }
+
+
+    @Override
+    public I getId() {
+        return id;
+    }
+
+
+    @Override
+    public V executeTask() throws Exception {
+        return returned;
+    }
+
+
+    @Override
+    public void exceptionThrown( final Throwable throwable ) {
+          //no op
+    }
+
+
+    @Override
+    public void rejected() {
+        //no op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 40ee5cf..aba8cd5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -1,11 +1,11 @@
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -17,10 +17,6 @@ import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 
 
 /**
@@ -30,54 +26,45 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
 
-    private final ListeningExecutorService executorService;
+    private final NamedForkJoinPool executorService;
 
     private final String name;
     private final int poolSize;
-    private final int queueLength;
 
 
     /**
      * @param name The name of this instance of the task executor
      * @param poolSize The size of the pool.  This is the number of concurrent tasks that can execute at once.
-     * @param queueLength The length of tasks to keep in the queue
      */
-    public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
+    public NamedTaskExecutorImpl( final String name, final int poolSize) {
         Preconditions.checkNotNull( name );
         Preconditions.checkArgument( name.length() > 0, "name must have a length" );
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
-        Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
         this.name = name;
         this.poolSize = poolSize;
-        this.queueLength = queueLength;
 
-        final BlockingQueue<Runnable> queue =
-                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+//        final BlockingQueue<Runnable> queue =
+//                queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
+//
+//        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
 
-        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
+       this.executorService =  new NamedForkJoinPool(poolSize);
     }
 
 
     @Override
-    public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
-
-        final ListenableFuture<V> future;
+    public <V, I> Task<V, I> submit( final Task<V, I> task ) {
 
         try {
-            future = executorService.submit( task );
-
-            /**
-             * Log our success or failures for debugging purposes
-             */
-            Futures.addCallback( future, new TaskFutureCallBack<V, I>( task ) );
+           executorService.submit( task );
         }
         catch ( RejectedExecutionException ree ) {
             task.rejected();
-            return Futures.immediateCancelledFuture();
         }
 
-        return future;
+        return task;
+
     }
 
 
@@ -109,6 +96,41 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     }
 
 
+    private final class NamedForkJoinPool extends ForkJoinPool{
+
+        private NamedForkJoinPool( final int workerThreadCount ) {
+            //TODO, verify the scheduler at the end
+            super( workerThreadCount, defaultForkJoinWorkerThreadFactory, new TaskExceptionHandler(), true );
+        }
+
+
+
+    }
+
+    private final class TaskExceptionHandler implements Thread.UncaughtExceptionHandler{
+
+        @Override
+        public void uncaughtException( final Thread t, final Throwable e ) {
+            LOG.error( "Uncaught exception on thread {} was {}", t, e );
+        }
+    }
+
+
+
+
+    private final class NamedWorkerThread extends ForkJoinWorkerThread{
+
+        /**
+         * Creates a ForkJoinWorkerThread operating in the given pool.
+         *
+         * @param pool the pool this thread works in
+         *
+         * @throws NullPointerException if pool is null
+         */
+        protected NamedWorkerThread(final String name,  final ForkJoinPool pool ) {
+            super( pool );
+        }
+    }
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 518b461..eb04c2c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -1,37 +1,47 @@
 package org.apache.usergrid.persistence.core.task;
 
 
-import java.util.concurrent.Callable;
-
+import java.util.concurrent.RecursiveTask;
 
 
 /**
  * The task to execute
  */
-public interface Task<V, I> extends Callable<V> {
+public abstract class Task<V, I> extends RecursiveTask<V> {
 
     /**
      * Get the unique identifier of this task.  This may be used to collapse runnables over a time period in the future
-     *
-     * @return
      */
-    I getId();
+    public abstract I getId();
+
+
+    @Override
+    protected V compute() {
+        try {
+            return executeTask();
+        }
+        catch ( Exception e ) {
+            exceptionThrown( e );
+            throw new RuntimeException( e );
+        }
+    }
+
 
     /**
-     * Invoked when this task throws an uncaught exception.
-     * @param throwable
+     * Execute the task
      */
-    void exceptionThrown(final Throwable throwable);
+    public abstract V executeTask() throws Exception;
 
     /**
-     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
-     * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
-     * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
-     * request and process later (lazy repair for instance ) do so.
-     *
+     * Invoked when this task throws an uncaught exception.
      */
-    void rejected();
-
-
+    public abstract void exceptionThrown( final Throwable throwable );
 
+    /**
+     * Invoked when we weren't able to run this task by the the thread attempting to schedule the task. If this task
+     * MUST be run immediately, you can invoke the call method from within this event to invoke the task in the
+     * scheduling thread.  Note that this has performance implications to the user.  If you can drop the request and
+     * process later (lazy repair for instance ) do so.
+     */
+    public abstract void rejected();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 619ac14..a3553c7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -13,5 +13,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> ListenableFuture<V> submit( Task<V, I> task );
+    public <V, I> Task<V, I > submit( Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
index 9da9263..f65d5a6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
@@ -11,8 +11,6 @@ import org.junit.Test;
 
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import junit.framework.TestCase;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
@@ -25,7 +23,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void jobSuccess() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -46,7 +44,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void exceptionThrown() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -55,9 +53,10 @@ public class NamedTaskExecutorImplTest {
         final RuntimeException re = new RuntimeException( "throwing exception" );
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
+
+
             @Override
-            public Void call() throws Exception {
-                super.call();
+            public Void executeTask() {
                 throw re;
             }
         };
@@ -77,7 +76,7 @@ public class NamedTaskExecutorImplTest {
 
     @Test
     public void noCapacity() throws InterruptedException {
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1 );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
@@ -86,8 +85,8 @@ public class NamedTaskExecutorImplTest {
 
         final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
             @Override
-            public Void call() throws Exception {
-                super.call();
+            public Void executeTask() throws Exception {
+                super.executeTask();
 
                 //park this thread so it takes up a task and the next is rejected
                 final Object mutex = new Object();
@@ -131,22 +130,22 @@ public class NamedTaskExecutorImplTest {
     public void noCapacityWithQueue() throws InterruptedException {
 
         final int threadPoolSize = 1;
-        final int queueSize = 10;
+       
 
-        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
 
         final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
         final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
         final CountDownLatch runLatch = new CountDownLatch( 1 );
 
-        int iterations = threadPoolSize + queueSize;
+        int iterations = threadPoolSize ;
 
-        for(int i = 0; i < iterations; i ++) {
+        for ( int i = 0; i < iterations; i++ ) {
 
             final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
                 @Override
-                public Void call() throws Exception {
-                    super.call();
+                public Void executeTask() throws Exception {
+                    super.executeTask();
 
                     //park this thread so it takes up a task and the next is rejected
                     final Object mutex = new Object();
@@ -162,7 +161,6 @@ public class NamedTaskExecutorImplTest {
         }
 
 
-
         runLatch.await( 1000, TimeUnit.MILLISECONDS );
 
         //now submit the second task
@@ -187,12 +185,81 @@ public class NamedTaskExecutorImplTest {
     }
 
 
-    private static abstract class TestTask<V> implements Task<V, UUID> {
+    @Test
+    public void jobTreeResult() throws InterruptedException {
+
+        final int threadPoolSize = 4;
+       
+
+        final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize );
+
+        final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
+        final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
+
+        //accomodates for splitting the job 1->2->4 and joining
+        final CountDownLatch runLatch = new CountDownLatch( 7 );
+
+
+        TestRecursiveTask task = new TestRecursiveTask( exceptionLatch, rejectedLatch, runLatch, 1, 3 );
+
+         executor.submit( task );
+
+
+        //compute our result
+        Integer result = task.join();
+
+        //result should be 1+2*2+3*4
+        final int expected = 4*3;
+
+        assertEquals(expected, result.intValue());
+
+        //just to check our latches
+        runLatch.await( 1000, TimeUnit.MILLISECONDS );
+
+        //now submit the second task
+
+
+    }
+
+
+    private static class TestRecursiveTask extends TestTask<Integer> {
+
+        private final int depth;
+        private final int maxDepth;
+
+        private TestRecursiveTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
+                                   final CountDownLatch runLatch, final int depth, final int maxDepth ) {
+            super( exceptionLatch, rejectedLatch, runLatch );
+            this.depth = depth;
+            this.maxDepth = maxDepth;
+        }
+
+
+        @Override
+        public Integer executeTask() throws Exception {
+
+            if(depth == maxDepth ){
+                return depth;
+            }
+
+            TestRecursiveTask left = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
+
+            TestRecursiveTask right = new TestRecursiveTask(exceptionLatch, rejectedLatch, runLatch, depth+1, maxDepth  );
+
+            //run our left in another thread
+            left.fork();
+
+            return right.compute() + left.join();
+        }
+    }
+
+
+    private static abstract class TestTask<V> extends Task<V, UUID> {
 
-        private final List<Throwable> exceptions;
-        private final CountDownLatch exceptionLatch;
-        private final CountDownLatch rejectedLatch;
-        private final CountDownLatch runLatch;
+        protected final List<Throwable> exceptions;
+        protected final CountDownLatch exceptionLatch;
+        protected final CountDownLatch rejectedLatch;
+        protected final CountDownLatch runLatch;
 
 
         private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
@@ -225,7 +292,7 @@ public class NamedTaskExecutorImplTest {
 
 
         @Override
-        public V call() throws Exception {
+        public V executeTask() throws Exception {
             runLatch.countDown();
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 608f8ce..a3978fc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -155,7 +155,7 @@ public class GraphModule extends AbstractModule {
     @Singleton
     @Provides
     public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
-        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize()  );
+        return new NamedTaskExecutorImpl( "graphTaskExecutor",  graphFig.getShardAuditWorkerCount()  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..15acaa8 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -42,9 +44,8 @@ public interface ShardGroupCompaction {
      *
      * @return A ListenableFuture with the result.  Note that some
      */
-    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                             final DirectedEdgeMeta edgeMeta,
-                                                             final ShardEntryGroup group );
+    public Task<AuditResult, ShardGroupCompactionImpl.ShardAuditKey> evaluateShardGroup(
+            final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group );
 
 
     public enum AuditResult {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dc3f448c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index be7fbe4..751b4d9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -37,14 +37,13 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Nullable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.ImmediateTask;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -66,9 +65,6 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -277,45 +273,29 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
     @Override
-    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
-                                                             final DirectedEdgeMeta edgeMeta,
-                                                             final ShardEntryGroup group ) {
+    public Task<AuditResult, ShardAuditKey> evaluateShardGroup( final ApplicationScope scope,
+                                                                final DirectedEdgeMeta edgeMeta,
+                                                                final ShardEntryGroup group ) {
 
         final double repairChance = random.nextDouble();
 
 
         //don't audit, we didn't hit our chance
         if ( repairChance > graphFig.getShardRepairChance() ) {
-            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+            return new ImmediateTask<AuditResult, ShardAuditKey>(  new ShardAuditKey( scope, edgeMeta, group ), AuditResult.NOT_CHECKED ) {};
         }
 
         /**
          * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+       return taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
 
-        /**
-         * Log our success or failures for debugging purposes
-         */
-        Futures.addCallback( future, new FutureCallback<AuditResult>() {
-            @Override
-            public void onSuccess( @Nullable final AuditResult result ) {
-                LOG.debug( "Successfully completed audit of task {}", result );
-            }
-
-
-            @Override
-            public void onFailure( final Throwable t ) {
-                LOG.error( "Unable to perform audit.  Exception is ", t );
-            }
-        } );
 
-        return future;
     }
 
 
-    private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
+    private final class ShardAuditTask extends Task<AuditResult, ShardAuditKey> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -350,7 +330,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
         @Override
-        public AuditResult call() throws Exception {
+        public AuditResult executeTask() throws Exception {
             /**
              * We don't have a compaction pending.  Run an audit on the shards
              */
@@ -401,10 +381,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group );
-                    LOG.info(
-                            "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
-                                    "{} is {}",
-                            new Object[] { scope, edgeMeta, group, result } );
+                    LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group "
+                                    + "{} is {}", new Object[] { scope, edgeMeta, group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -418,7 +396,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
     }
 
 
-    private static final class ShardAuditKey {
+    public static final class ShardAuditKey {
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
         private final ShardEntryGroup group;


[15/15] git commit: Merge branch 'eventsystem' into two-dot-o

Posted by to...@apache.org.
Merge branch 'eventsystem' into two-dot-o


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2678eae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2678eae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2678eae9

Branch: refs/heads/two-dot-o
Commit: 2678eae9e7b61911d02c60394ae2035fb4a83955
Parents: abbd76e d85b97c
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:31:56 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:31:56 2014 -0600

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |  12 +-
 .../collection/event/EntityDeleted.java         |  25 +
 .../collection/event/EntityVersionCreated.java  |  25 +
 .../collection/event/EntityVersionDeleted.java  |  29 +
 .../collection/guice/CollectionModule.java      |  18 +-
 .../guice/CollectionTaskExecutor.java           |  35 +
 .../impl/EntityCollectionManagerImpl.java       |  14 +
 .../impl/EntityVersionCleanupTask.java          | 198 ++++++
 .../serialization/SerializationFig.java         |  32 +-
 .../serialization/impl/LogEntryIterator.java    | 114 +++
 .../EntityCollectionManagerStressTest.java      |   2 +
 .../impl/EntityVersionCleanupTaskTest.java      | 690 +++++++++++++++++++
 .../impl/LogEntryIteratorTest.java              | 131 ++++
 .../collection/util/LogEntryMock.java           | 152 ++++
 .../core/astyanax/AstyanaxKeyspaceProvider.java |   2 +
 .../persistence/core/guice/CommonModule.java    |   2 +
 .../core/task/NamedTaskExecutorImpl.java        | 167 +++++
 .../usergrid/persistence/core/task/Task.java    |  31 +
 .../persistence/core/task/TaskExecutor.java     |  23 +
 .../core/task/NamedTaskExecutorImplTest.java    | 227 ++++++
 .../usergrid/persistence/graph/GraphFig.java    |   2 +
 .../persistence/graph/event/EdgeDeleted.java    |   8 +
 .../persistence/graph/guice/GraphModule.java    |  16 +
 .../graph/guice/GraphTaskExecutor.java          |  33 +
 .../impl/shard/impl/NodeShardCacheImpl.java     |   5 -
 .../shard/impl/ShardGroupCompactionImpl.java    | 175 +++--
 .../graph/GraphManagerStressTest.java           |   1 +
 .../impl/shard/ShardGroupCompactionTest.java    |   5 +-
 28 files changed, 2081 insertions(+), 93 deletions(-)
----------------------------------------------------------------------



[09/15] git commit: Merge branch 'two-dot-o' into eventsystem

Posted by to...@apache.org.
Merge branch 'two-dot-o' into eventsystem


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9c63ce63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9c63ce63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9c63ce63

Branch: refs/heads/two-dot-o
Commit: 9c63ce63fd23445d46d0371d8728b17e276bde12
Parents: 1060478 9e2743d
Author: Todd Nine <to...@apache.org>
Authored: Mon Sep 29 14:52:25 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Sep 29 14:52:25 2014 -0600

----------------------------------------------------------------------
 portal/js/global/ug-service.js                  | 1100 +++--
 portal/js/libs/usergrid.sdk.js                  | 4264 +++++++++---------
 stack/core/pom.xml                              |   10 -
 .../corepersistence/CpEntityManager.java        |    1 +
 .../corepersistence/CpEntityManagerFactory.java |  184 +-
 .../corepersistence/CpRelationManager.java      |   58 +-
 .../HybridEntityManagerFactory.java             |   10 +
 .../mq/cassandra/io/AbstractSearch.java         |   17 +-
 .../persistence/EntityManagerFactory.java       |    4 +
 .../cassandra/EntityManagerFactoryImpl.java     |   51 +
 .../persistence/PerformanceEntityReadTest.java  |    2 +
 .../persistence/PerformanceEntityWriteTest.java |    2 +
 .../cassandra/QueryProcessorTest.java           |    2 -
 stack/core/src/test/resources/log4j.properties  |    1 +
 .../index/impl/EsEntityIndexImpl.java           |  122 +-
 .../persistence/index/impl/EsQueryVistor.java   |  150 +-
 .../index/impl/ElasticSearchTest.java           |  274 --
 .../impl/EntityConnectionIndexImplTest.java     |    2 -
 .../index/impl/EntityIndexMapUtils.java         |   12 +-
 .../persistence/index/impl/EntityIndexTest.java |   77 +-
 .../apache/usergrid/rest/AbstractRestIT.java    |    6 +-
 .../usergrid/rest/ConcurrentRestITSuite.java    |   54 +-
 .../apache/usergrid/rest/NotificationsIT.java   |  162 +-
 .../org/apache/usergrid/rest/RestITSuite.java   |   45 +-
 .../ApplicationRequestCounterIT.java            |   97 -
 .../rest/applications/DevicesResourceIT.java    |   87 -
 .../collection/BadGrammarQueryTest.java         |   79 -
 .../collection/CollectionsResourceIT.java       |  205 +
 .../collection/PagingResourceIT.java            |  283 --
 .../activities/ActivityResourceIT.java          |  188 +
 .../collection/activities/AndOrQueryTest.java   |  203 -
 .../collection/activities/OrderByTest.java      |  172 -
 .../activities/PagingEntitiesTest.java          |  141 -
 .../collection/devices/DevicesResourceIT.java   |   87 +
 .../collection/groups/GeoPagingTest.java        |  133 -
 .../collection/groups/GroupResourceIT.java      |  295 ++
 .../collection/paging/PagingEntitiesTest.java   |  141 +
 .../collection/paging/PagingResourceIT.java     |  301 ++
 .../users/ConnectionResourceTest.java           |  271 ++
 .../collection/users/OwnershipResourceIT.java   |  379 ++
 .../collection/users/PermissionsResourceIT.java |  768 ++++
 .../collection/users/RetrieveUsersTest.java     |   87 +
 .../collection/users/UserResourceIT.java        | 1418 ++++++
 .../users/extensions/TestResource.java          |   51 +
 .../events/ApplicationRequestCounterIT.java     |   97 +
 .../applications/queries/AndOrQueryTest.java    |  203 +
 .../queries/BadGrammarQueryTest.java            |   79 +
 .../applications/queries/GeoPagingTest.java     |  133 +
 .../applications/queries/MatrixQueryTests.java  |  202 +
 .../rest/applications/queries/OrderByTest.java  |  172 +
 .../applications/users/ActivityResourceIT.java  |  188 -
 .../users/CollectionsResourceIT.java            |  205 -
 .../users/ConnectionResourceTest.java           |  271 --
 .../applications/users/GroupResourceIT.java     |  295 --
 .../applications/users/MatrixQueryTests.java    |  202 -
 .../applications/users/OwnershipResourceIT.java |  379 --
 .../users/PermissionsResourceIT.java            |  768 ----
 .../applications/users/RetrieveUsersTest.java   |   87 -
 .../rest/applications/users/UserResourceIT.java | 1418 ------
 .../users/extensions/TestResource.java          |   51 -
 .../usergrid/rest/management/AccessTokenIT.java |  350 ++
 .../usergrid/rest/management/AdminUsersIT.java  |  807 ++++
 .../rest/management/ManagementResourceIT.java   |  397 +-
 .../rest/management/OrganizationsIT.java        |  378 ++
 .../organizations/OrganizationResourceIT.java   |   90 -
 .../organizations/OrganizationsResourceIT.java  |  322 --
 .../rest/management/users/MUUserResourceIT.java |  654 ---
 .../UsersOrganizationsResourceIT.java           |   72 -
 stack/rest/src/test/resources/log4j.properties  |    7 +-
 .../notifications/ApplicationQueueManager.java  |   40 +-
 .../notifications/NotificationsService.java     |    7 +-
 .../notifications/NotificationsTaskManager.java |   33 -
 .../services/notifications/QueueListener.java   |   71 +-
 .../services/notifications/QueueManager.java    |    4 -
 .../notifications/SingleQueueTaskManager.java   |   63 +-
 .../services/notifications/TaskTracker.java     |    4 +-
 .../resources/usergrid-services-context.xml     |    9 +-
 .../apns/NotificationsServiceIT.java            |    6 +-
 .../gcm/NotificationsServiceIT.java             |    9 +-
 stack/tools/README.md                           |   18 +
 .../org/apache/usergrid/tools/IndexRebuild.java |   99 +-
 .../org/apache/usergrid/tools/ToolBase.java     |   26 +-
 82 files changed, 10302 insertions(+), 9910 deletions(-)
----------------------------------------------------------------------



[13/15] git commit: Refactored listener to take buffers of versions for efficiency

Posted by to...@apache.org.
Refactored listener to take buffers of versions for efficiency

Refactored task to use RX for observable streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cadba29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cadba29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cadba29

Branch: refs/heads/two-dot-o
Commit: 8cadba29c80766bc8fe12755ac02cca2f741f077
Parents: 1a41624
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 10:30:28 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 10:30:28 2014 -0600

----------------------------------------------------------------------
 .../collection/event/EntityVersionDeleted.java  |   5 +-
 .../impl/EntityVersionCleanupTask.java          |  85 ++++---
 .../serialization/impl/LogEntryIterator.java    |   9 +-
 .../impl/EntityVersionCleanupTaskTest.java      | 230 ++++++++++++-------
 4 files changed, 213 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
index 3b76e84..ff7d960 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -1,6 +1,7 @@
 package org.apache.usergrid.persistence.collection.event;
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -21,8 +22,8 @@ public interface EntityVersionDeleted {
      *
      * @param scope The scope of the entity
      * @param entityId The entity Id that was removed
-     * @param entityVersion The version that was removed
+     * @param entityVersions The versions that are to be removed
      */
-    public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+    public void versionDeleted(final CollectionScope scope, final Id entityId, final List<UUID> entityVersions);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index d7ece40..2d30d36 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -1,9 +1,9 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -15,9 +15,14 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
@@ -37,6 +42,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
     private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
+    private final Keyspace keyspace;
 
     private final SerializationFig serializationFig;
 
@@ -48,12 +54,13 @@ public class EntityVersionCleanupTask implements Task<Void> {
     public EntityVersionCleanupTask( final SerializationFig serializationFig,
                                      final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                                      final MvccEntitySerializationStrategy entitySerializationStrategy,
-                                     final List<EntityVersionDeleted> listeners, final CollectionScope scope,
-                                     final Id entityId, final UUID version ) {
+                                     final Keyspace keyspace, final List<EntityVersionDeleted> listeners,
+                                     final CollectionScope scope, final Id entityId, final UUID version ) {
 
         this.serializationFig = serializationFig;
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.keyspace = keyspace;
         this.listeners = listeners;
         this.scope = scope;
         this.entityId = entityId;
@@ -91,36 +98,67 @@ public class EntityVersionCleanupTask implements Task<Void> {
         final UUID maxVersion = version;
 
 
-        LogEntryIterator logEntryIterator =
-                new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
-                        serializationFig.getHistorySize() );
+        Observable<MvccLogEntry> versions = Observable.create( new ObservableIterator( "versionIterators" ) {
+            @Override
+            protected Iterator getIterator() {
+                return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
+                        serializationFig.getBufferSize() );
+            }
+        } );
 
 
-        //for every entry, we want to clean it up with listeners
+        //get the uuid from the version
+        versions.map( new Func1<MvccLogEntry, UUID>() {
+            @Override
+            public UUID call( final MvccLogEntry mvccLogEntry ) {
+                return mvccLogEntry.getVersion();
+            }
+        } )
+                //buffer our versions
+         .buffer( serializationFig.getBufferSize() )
+         //for each buffer set, delete all of them
+         .doOnNext( new Action1<List<UUID>>() {
+            @Override
+            public void call( final List<UUID> versions ) {
 
-        while ( logEntryIterator.hasNext() ) {
+                //Fire all the listeners
+                fireEvents( versions );
 
-            final MvccLogEntry logEntry = logEntryIterator.next();
+                MutationBatch entityBatch = keyspace.prepareMutationBatch();
+                MutationBatch logBatch = keyspace.prepareMutationBatch();
 
+                for ( UUID version : versions ) {
+                    final MutationBatch entityDelete = entitySerializationStrategy.delete( scope, entityId, version );
 
-            final UUID version = logEntry.getVersion();
+                    entityBatch.mergeShallow( entityDelete );
 
+                    final MutationBatch logDelete = logEntrySerializationStrategy.delete( scope, entityId, version );
 
-            fireEvents();
+                    logBatch.mergeShallow( logDelete );
+                }
 
-            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
-            //after every successful invocation of listeners and entity removal
-            entitySerializationStrategy.delete( scope, entityId, version ).execute();
 
-            logEntrySerializationStrategy.delete( scope, entityId, version ).execute();
-        }
+                try {
+                    entityBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to delete entities in cleanup", e );
+                }
 
+                try {
+                    logBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to delete entities from the log", e );
+                }
+            }
+        } ).count().toBlocking().last();
 
         return null;
     }
 
 
-    private void fireEvents() throws ExecutionException, InterruptedException {
+    private void fireEvents( final List<UUID> versions ) {
 
         final int listenerSize = listeners.size();
 
@@ -129,7 +167,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
         }
 
         if ( listenerSize == 1 ) {
-            listeners.get( 0 ).versionDeleted( scope, entityId, version );
+            listeners.get( 0 ).versionDeleted( scope, entityId, versions );
             return;
         }
 
@@ -146,7 +184,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                           return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
                               @Override
                               public void call( final EntityVersionDeleted listener ) {
-                                  listener.versionDeleted( scope, entityId, version );
+                                  listener.versionDeleted( scope, entityId, versions );
                               }
                           } );
                       }
@@ -154,15 +192,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
 
         LOG.debug( "Finished firing {} listeners", listenerSize );
     }
-
-
-    private static interface ListenerRunner {
-
-        /**
-         * Run the listeners
-         */
-        public void runListeners();
-    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
index 53eb6e3..d87f850 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -11,12 +11,12 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerialization
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Preconditions;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /**
  * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
- *
  */
 public class LogEntryIterator implements Iterator<MvccLogEntry> {
 
@@ -32,16 +32,19 @@ public class LogEntryIterator implements Iterator<MvccLogEntry> {
 
 
     /**
-     *
      * @param logEntrySerializationStrategy The serialization strategy to get the log entries
      * @param scope The scope of the entity
      * @param entityId The id of the entity
-     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version < max
+     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version
+     * < max
      * @param pageSize The fetch size to get when querying the serialization strategy
      */
     public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                              final CollectionScope scope, final Id entityId, final UUID maxVersion,
                              final int pageSize ) {
+
+        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.scope = scope;
         this.entityId = entityId;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cadba29/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index a34b4f8..1fce6e2 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
 import org.junit.AfterClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -43,6 +42,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
@@ -69,13 +69,13 @@ public class EntityVersionCleanupTaskTest {
     }
 
 
-    @Test
+    @Test(timeout=10000)
     public void noListenerOneVersion() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -83,6 +83,15 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //intentionally no events
         final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -105,20 +114,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch newBatch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( newBatch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( newBatch );
 
 
         //start the task
@@ -128,22 +135,22 @@ public class EntityVersionCleanupTaskTest {
         future.get();
 
         //verify it was run
-        verify( firstBatch ).execute();
+        verify( entityBatch ).execute();
 
-        verify( secondBatch ).execute();
+        verify( logBatch ).execute();
     }
 
 
     /**
      * Tests the cleanup task on the first version ceated
      */
-    @Test
+    @Test(timeout=10000)
     public void noListenerNoVersions() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -151,6 +158,15 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //intentionally no events
         final List<EntityVersionDeleted> listeners = new ArrayList<EntityVersionDeleted>();
@@ -173,20 +189,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( batch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -196,19 +210,19 @@ public class EntityVersionCleanupTaskTest {
         future.get();
 
         //verify it was run
-        verify( firstBatch, never() ).execute();
+        verify( entityBatch, never() ).execute();
 
-        verify( secondBatch, never() ).execute();
+        verify( logBatch, never() ).execute();
     }
 
 
-    @Test
+    @Test(timeout=10000)
     public void singleListenerSingleVersion() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -216,6 +230,15 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 1;
@@ -246,20 +269,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( batch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -270,23 +291,23 @@ public class EntityVersionCleanupTaskTest {
 
         //we deleted the version
         //verify it was run
-        verify( firstBatch ).execute();
+        verify( entityBatch ).execute();
 
-        verify( secondBatch ).execute();
+        verify( logBatch ).execute();
 
         //the latch was executed
         latch.await();
     }
 
 
-    @Test
+    @Test(timeout=10000)
     public void multipleListenerMultipleVersions()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -294,12 +315,22 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 10;
 
 
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn * 3 );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * 3 );
 
         final EntityVersionDeletedTest listener1 = new EntityVersionDeletedTest( latch );
         final EntityVersionDeletedTest listener2 = new EntityVersionDeletedTest( latch );
@@ -329,20 +360,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
+                .thenReturn( batch );
 
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
-
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -353,9 +382,15 @@ public class EntityVersionCleanupTaskTest {
 
         //we deleted the version
         //verify we deleted everything
-        verify( firstBatch, times( sizeToReturn ) ).execute();
+        verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+        verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
 
-        verify( secondBatch, times( sizeToReturn ) ).execute();
+        verify( logBatch ).execute();
+
+        verify( entityBatch ).execute();
 
         //the latch was executed
         latch.await();
@@ -364,18 +399,15 @@ public class EntityVersionCleanupTaskTest {
 
     /**
      * Tests what happens when our listeners are VERY slow
-     * @throws ExecutionException
-     * @throws InterruptedException
-     * @throws ConnectionException
      */
-    @Test
+    @Test(timeout=10000)
     public void multipleListenerMultipleVersionsNoThreadsToRun()
             throws ExecutionException, InterruptedException, ConnectionException {
 
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -383,6 +415,17 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace = mock( Keyspace.class );
+
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
+
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 10;
@@ -390,7 +433,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 5;
 
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
 
@@ -426,20 +469,18 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask cleanupTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, listeners, appScope, entityId, version );
+                        mvccEntitySerializationStrategy, keyspace, listeners, appScope, entityId, version );
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
-
+                .thenReturn( batch );
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
@@ -448,7 +489,7 @@ public class EntityVersionCleanupTaskTest {
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!future.isDone()) {
+        while ( !future.isDone() ) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
@@ -456,37 +497,41 @@ public class EntityVersionCleanupTaskTest {
         //wait for the task
         future.get();
 
+
+
         //we deleted the version
         //verify we deleted everything
-        verify( firstBatch, times( sizeToReturn ) ).execute();
+        verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+        verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+
+
+        verify( logBatch ).execute();
+
+        verify( entityBatch ).execute();
+
 
-        verify( secondBatch, times( sizeToReturn ) ).execute();
 
         //the latch was executed
         latch.await();
     }
 
 
-
     /**
      * Tests that our task will run in the caller if there's no threads, ensures that the task runs
-     * @throws ExecutionException
-     * @throws InterruptedException
-     * @throws ConnectionException
      */
-    @Test
-    public void runsWhenRejected()
-            throws ExecutionException, InterruptedException, ConnectionException {
+    @Test(timeout=10000)
+    public void runsWhenRejected() throws ExecutionException, InterruptedException, ConnectionException {
 
 
         /**
          * only 1 thread on purpose, we want to saturate the task
          */
-        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0);
+        final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 1, 0 );
 
         final SerializationFig serializationFig = mock( SerializationFig.class );
 
-        when( serializationFig.getHistorySize() ).thenReturn( 10 );
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
 
         final MvccEntitySerializationStrategy mvccEntitySerializationStrategy =
                 mock( MvccEntitySerializationStrategy.class );
@@ -494,6 +539,16 @@ public class EntityVersionCleanupTaskTest {
         final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy =
                 mock( MvccLogEntrySerializationStrategy.class );
 
+        final Keyspace keyspace1 = mock( Keyspace.class );
+        final Keyspace keyspace2 = mock( Keyspace.class );
+
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        when( keyspace1.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+        when( keyspace2.prepareMutationBatch() ).thenReturn( entityBatch ).thenReturn( logBatch );
+
 
         //create a latch for the event listener, and add it to the list of events
         final int sizeToReturn = 10;
@@ -501,7 +556,7 @@ public class EntityVersionCleanupTaskTest {
 
         final int listenerCount = 2;
 
-        final CountDownLatch latch = new CountDownLatch( sizeToReturn * listenerCount );
+        final CountDownLatch latch = new CountDownLatch( sizeToReturn/serializationFig.getBufferSize() * listenerCount );
         final Semaphore waitSemaphore = new Semaphore( 0 );
 
 
@@ -509,7 +564,6 @@ public class EntityVersionCleanupTaskTest {
         final EntityVersionDeletedTest runListener = new EntityVersionDeletedTest( latch );
 
 
-
         final Id applicationId = new SimpleId( "application" );
 
 
@@ -528,37 +582,37 @@ public class EntityVersionCleanupTaskTest {
 
         EntityVersionCleanupTask firstTask =
                 new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                        mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(slowListener), appScope, entityId, version );
-
+                        mvccEntitySerializationStrategy, keyspace1, Arrays.<EntityVersionDeleted>asList( slowListener ),
+                        appScope, entityId, version );
 
 
         //change the listeners to one that is just invoked quickly
 
 
         EntityVersionCleanupTask secondTask =
-                      new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
-                              mvccEntitySerializationStrategy, Arrays.<EntityVersionDeleted>asList(runListener), appScope, entityId, version );
+                new EntityVersionCleanupTask( serializationFig, mvccLogEntrySerializationStrategy,
+                        mvccEntitySerializationStrategy, keyspace2, Arrays.<EntityVersionDeleted>asList( runListener ),
+                        appScope, entityId, version );
 
 
-        final MutationBatch firstBatch = mock( MutationBatch.class );
+        final MutationBatch batch = mock( MutationBatch.class );
 
 
         //set up returning a mutator
         when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( firstBatch );
+                .thenReturn( batch );
 
 
-        final MutationBatch secondBatch = mock( MutationBatch.class );
 
         when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
-                .thenReturn( secondBatch );
+                .thenReturn( batch );
 
 
         //start the task
-        ListenableFuture<Void> future1 =  taskExecutor.submit( firstTask );
+        ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
 
         //now start another task while the slow running task is running
-        ListenableFuture<Void> future2 =  taskExecutor.submit( secondTask );
+        ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
 
         //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
         future2.get();
@@ -566,7 +620,7 @@ public class EntityVersionCleanupTaskTest {
         /**
          * While we're not done, release latches every 200 ms
          */
-        while(!future1.isDone()) {
+        while ( !future1.isDone() ) {
             Thread.sleep( 200 );
             waitSemaphore.release( listenerCount );
         }
@@ -576,9 +630,19 @@ public class EntityVersionCleanupTaskTest {
 
         //we deleted the version
         //verify we deleted everything
-        verify( firstBatch, times( sizeToReturn*2 ) ).execute();
 
-        verify( secondBatch, times( sizeToReturn*2 ) ).execute();
+
+        //we deleted the version
+        //verify we deleted everything
+        verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
+
+        verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
+
+
+        verify( logBatch, times(2) ).execute();
+
+        verify( entityBatch, times(2) ).execute();
+
 
         //the latch was executed
         latch.await();
@@ -595,7 +659,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
             invocationLatch.countDown();
         }
     }
@@ -612,7 +676,7 @@ public class EntityVersionCleanupTaskTest {
 
 
         @Override
-        public void versionDeleted( final CollectionScope scope, final Id entityId, final UUID entityVersion ) {
+        public void versionDeleted( final CollectionScope scope, final Id entityId, final List<UUID> entityVersion ) {
             //wait for unblock to happen before counting down invocation latches
             try {
                 blockLatch.acquire();