You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:02 UTC

[03/19] git commit: Added events and task cleanup

Added events and task cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c0b78b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c0b78b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c0b78b9f

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: c0b78b9fab07cbc2f5975d89664ff8a523b98b9a
Parents: 4c72f5c
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 12:09:11 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 12:09:11 2014 -0600

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |  12 +-
 .../collection/event/EntityDeleted.java         |  25 ++++
 .../collection/event/EntityVersionCreated.java  |  25 ++++
 .../collection/event/EntityVersionDeleted.java  |  28 +++++
 .../collection/event/EntityVersionRemoved.java  |  26 -----
 .../impl/EntityCollectionManagerImpl.java       |  12 ++
 .../impl/EntityVersionCleanupTask.java          | 115 +++++++++++++++++++
 .../serialization/impl/VersionIterator.java     | 111 ++++++++++++++++++
 .../core/task/NamedTaskExecutorImpl.java        |  26 +++--
 .../usergrid/persistence/core/task/Task.java    |   2 +-
 .../persistence/core/task/TaskExecutor.java     |   5 +-
 .../persistence/graph/event/EdgeDeleted.java    |   8 ++
 12 files changed, 349 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 84bcea6..ee3a5d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -27,14 +27,14 @@ import rx.Observable;
 
 /**
  *
- *
- * @author: tnine
+ * The operations for performing changes on an entity
  *
  */
 public interface EntityCollectionManager {
 
     /**
-     * Write the entity in the entity collection.
+     * Write the entity in the entity collection.  This is an entire entity, it's contents will
+     * completely overwrite the previous values, if it exists.
      *
      * @param entity The entity to update
      */
@@ -52,10 +52,10 @@ public interface EntityCollectionManager {
     public Observable<Entity> load( Id entityId );
 
 
-    //TODO add partial update
-
     /**
-     * Takes the change and reloads an entity with all changes applied.
+     * Takes the change and reloads an entity with all changes applied in this entity applied.
+     * The resulting entity from calling load will be the previous version of this entity + the entity
+     * in this object applied to it.
      * @param entity
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
new file mode 100644
index 0000000..1c075f1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity is deleted.  The delete log entry is not removed until all instances of this listener has completed.
+ * If any listener fails with an exception, the entity will not be removed.
+ *
+ */
+public interface EntityDeleted {
+
+
+    /**
+     * The event fired when an entity is deleted
+     *
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     */
+    public void deleted( final CollectionScope scope, final Id entityId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
new file mode 100644
index 0000000..9d7761c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ *
+ * Invoked after a new version of an entity has been created.  The entity should be a complete
+ * view of the entity.
+ *
+ */
+public interface EntityVersionCreated {
+
+
+    /**
+     * The new version of the entity.  Note that this should be a fully merged view of the entity.
+     * In the case of partial updates, the passed entity should be fully merged with it's previous entries
+     * @param scope The scope of the entity
+     * @param entity The fully loaded and merged entity
+     */
+    public void versionCreated( final CollectionScope scope, final Entity entity );
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
new file mode 100644
index 0000000..3b76e84
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -0,0 +1,28 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity version is removed.  Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionDeleted {
+
+
+    /**
+     * The version specified was removed.
+     *
+     * @param scope The scope of the entity
+     * @param entityId The entity Id that was removed
+     * @param entityVersion The version that was removed
+     */
+    public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
deleted file mode 100644
index dca575d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-
-
-/**
- *
- * Invoked when an entity version is removed.  Note that this is not a deletion of the entity itself,
- * only the version itself.
- *
- */
-public interface EntityVersionRemoved {
-
-
-    /**
-     * The version specified was removed.
-     * @param scope
-     * @param entityId
-     * @param entityVersion
-     */
-    public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 96a7ed2..a6fc31b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -19,11 +19,16 @@
 package org.apache.usergrid.persistence.collection.impl;
 
 
+import java.util.List;
+import java.util.UUID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.guice.Write;
 import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -38,6 +43,8 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimist
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -83,6 +90,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MarkStart markStart;
     private final MarkCommit markCommit;
 
+    private final TaskExecutor taskExecutor;
+
     @Inject
     public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
                                         @WriteUpdate final WriteStart writeUpdate,
@@ -90,6 +99,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                                         final WriteOptimisticVerify writeOptimisticVerify,
                                         final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
                                         final MarkStart markStart, final MarkCommit markCommit,
+                                        final TaskExecutor taskExecutor,
                                         @Assisted final CollectionScope collectionScope) {
 
         Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
@@ -109,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
         this.uuidService = uuidService;
         this.collectionScope = collectionScope;
+        this.taskExecutor = taskExecutor;
     }
 
 
@@ -240,4 +251,5 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
new file mode 100644
index 0000000..11e2da9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -0,0 +1,115 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.core.entity.EntityVersion;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
+ * retained, the range is exclusive.
+ */
+class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
+
+
+    private final CollectionIoEvent<EntityVersion> collectionIoEvent;
+    private final List<EntityVersionDeleted> listeners;
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                      final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                      final CollectionIoEvent<EntityVersion> collectionIoEvent,
+                                      final List<EntityVersionDeleted> listeners ) {
+        this.collectionIoEvent = collectionIoEvent;
+        this.listeners = listeners;
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public CollectionIoEvent<EntityVersion> getId() {
+        return collectionIoEvent;
+    }
+
+
+    @Override
+    public void exceptionThrown( final Throwable throwable ) {
+        LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+    }
+
+
+    @Override
+    public void rejected() {
+        //Our task was rejected meaning our queue was full.  We need this operation to run,
+        // so we'll run it in our current thread
+
+        try {
+            call();
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Exception thrown in call task", e );
+        }
+    }
+
+
+    @Override
+    public CollectionIoEvent<EntityVersion> call() throws Exception {
+
+        final CollectionScope scope = collectionIoEvent.getEntityCollection();
+        final Id entityId = collectionIoEvent.getEvent().getId();
+        final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+
+        VersionIterator versionIterator =
+                new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 );
+
+
+        UUID currentVersion = null;
+
+        //for every entry, we want to clean it up with listeners
+
+        while ( versionIterator.hasNext() ) {
+
+            currentVersion = versionIterator.next();
+
+
+            //execute all the listeners
+            for ( EntityVersionDeleted listener : listeners ) {
+                listener.versionDeleted( scope, entityId, currentVersion );
+            }
+
+            //we do multiple invocations on purpose.  Our log is our source of versions, only delete from it
+            //after every successful invocation of listeners and entity removal
+            entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+            logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+
+        }
+
+
+        return collectionIoEvent;
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
new file mode 100644
index 0000000..323f12d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ *
+ */
+public class VersionIterator implements Iterator<UUID> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final CollectionScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+
+
+    /**
+     *
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param maxVersion The max version of the entity.  Iterator will iterate from max to min starting with the version < max
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                            final CollectionScope scope, final Id entityId, final UUID maxVersion,
+                            final int pageSize ) {
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.nextStart = maxVersion;
+        this.pageSize = pageSize;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public UUID next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next().getVersion();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize = pageSize + 1;
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+            results.remove( 0 );
+        }
+
+
+        //we have results, set our next start
+        if ( results.size() == requestedSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8184937..40ee5cf 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -32,6 +32,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
 
     private final ListeningExecutorService executorService;
 
+    private final String name;
+    private final int poolSize;
+    private final int queueLength;
+
 
     /**
      * @param name The name of this instance of the task executor
@@ -44,11 +48,14 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
         Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
         Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
 
+        this.name = name;
+        this.poolSize = poolSize;
+        this.queueLength = queueLength;
 
         final BlockingQueue<Runnable> queue =
                 queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
 
-        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+        executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
     }
 
 
@@ -105,11 +112,11 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */
-    private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
 
-        public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+        public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
 
-            super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+            super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
                     new RejectedHandler() );
         }
     }
@@ -118,15 +125,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     /**
      * Thread factory that will name and count threads for easier debugging
      */
-    private static final class CountingThreadFactory implements ThreadFactory {
+    private final class CountingThreadFactory implements ThreadFactory {
 
         private final AtomicLong threadCounter = new AtomicLong();
 
-        private final String name;
-
-
-        private CountingThreadFactory( final String name ) {this.name = name;}
-
 
         @Override
         public Thread newThread( final Runnable r ) {
@@ -144,12 +146,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
     /**
      * The handler that will handle rejected executions and signal the interface
      */
-    private static final class RejectedHandler implements RejectedExecutionHandler {
+    private final class RejectedHandler implements RejectedExecutionHandler {
 
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            LOG.warn( "Audit queue full, rejecting audit task {}", r );
+            LOG.warn( "{} task queue full, rejecting task {}", name, r );
 
             throw new RejectedExecutionException( "Unable to run task, queue full" );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 8b1ed22..518b461 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -27,7 +27,7 @@ public interface Task<V, I> extends Callable<V> {
      * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
      * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
      * task in the scheduling thread.  Note that this has performance implications to the user.  If you can drop the
-     * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+     * request and process later (lazy repair for instance ) do so.
      *
      */
     void rejected();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index b5491bc..619ac14 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -1,6 +1,9 @@
 package org.apache.usergrid.persistence.core.task;
 
 
+import com.google.common.util.concurrent.ListenableFuture;
+
+
 /**
  * An interface for execution of tasks
  */
@@ -10,5 +13,5 @@ public interface TaskExecutor {
      * Submit the task asynchronously
      * @param task
      */
-    public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
+    public <V, I> ListenableFuture<V> submit( Task<V, I> task );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
new file mode 100644
index 0000000..631de59
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
@@ -0,0 +1,8 @@
+package org.apache.usergrid.persistence.graph.event;
+
+
+/**
+ *
+ *
+ */
+public interface EdgeDeleted {}