You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:02 UTC

[03/43] git commit: Added basic Q logic for in memory queue as proof of concept. Need to test and add reserved capacity for writes.

Added basic Q logic for in memory queue as proof of concept.  Need to test and add reserved capacity for writes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a586a342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a586a342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a586a342

Branch: refs/heads/two-dot-o
Commit: a586a342e176a20d76db501b36a50feeb8655f16
Parents: 0133aef
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 26 22:01:22 2014 -0800
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 26 22:01:22 2014 -0800

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |   3 +
 .../usergrid/persistence/graph/EdgeManager.java |   2 +-
 .../usergrid/persistence/graph/GraphFig.java    |   6 ++
 .../graph/consistency/AsyncProcessor.java       |  19 +++-
 .../graph/consistency/AsyncProcessorImpl.java   |  39 +++----
 .../graph/consistency/AsynchonrousEvent.java    |  18 ++++
 .../consistency/AsynchronousEventListener.java  |  18 ++++
 .../graph/consistency/ErrorListener.java        |  10 ++
 .../graph/consistency/LocalTimeoutQueue.java    |  23 +++--
 .../consistency/SimpleAsynchonrousEvent.java    |  30 ++++++
 .../graph/consistency/SimpleTimeoutEvent.java   |  30 ------
 .../graph/consistency/TimeoutEvent.java         |  18 ----
 .../graph/consistency/TimeoutEventListener.java |  18 ----
 .../graph/consistency/TimeoutQueue.java         |   8 +-
 .../persistence/graph/guice/EdgeDelete.java     |  37 +++++++
 .../persistence/graph/guice/EdgeWrite.java      |  37 +++++++
 .../persistence/graph/guice/GraphModule.java    |  29 +++---
 .../persistence/graph/guice/NodeDelete.java     |  37 +++++++
 .../persistence/graph/impl/EdgeManagerImpl.java | 103 +++++++++++++++++--
 .../EdgeMetadataSerialization.java              |   2 +-
 .../graph/serialization/EdgeSerialization.java  |   4 +-
 .../graph/serialization/NodeSerialization.java  |   2 +-
 .../impl/EdgeSerializationImpl.java             |   4 +-
 .../graph/consistency/AsyncProcessorTest.java   |  44 ++++----
 .../consistency/LocalTimeoutQueueTest.java      |  28 ++---
 25 files changed, 399 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index d91a9b3..381c651 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -50,4 +50,7 @@ public interface EntityCollectionManager {
      * Load the entity with the given entity Id
      */
     public Observable<Entity> load( Id entityId );
+
+
+    //TODO add partial update
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
index 6a6a092..9f9c510 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
@@ -66,7 +66,7 @@ public interface EdgeManager {
      * @param edge The edge to delete
      *
      *
-     * Delete the edge. Implementation should also delete the incoming (reversed) edge.
+     * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge.
      */
     Observable<Edge> deleteEdge( Edge edge );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index c5a2d5a..bbf83be 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -19,6 +19,8 @@ public interface GraphFig extends GuicyFig {
 
     public static final String WRITE_CL = "usergrid.graph.write.cl";
 
+    public static final String WRITE_TIMEOUT  = "usergrid.graph.write.timeout";
+
     @Default( "1000" )
     @Key( SCAN_PAGE_SIZE )
     int getScanPageSize();
@@ -31,6 +33,10 @@ public interface GraphFig extends GuicyFig {
     @Key( WRITE_CL )
     String getWriteCL();
 
+    @Default("10000")
+    @Key( WRITE_TIMEOUT )
+    long getWriteTimeout();
+
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 4ea3ef8..8d603b3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -19,16 +19,29 @@ public interface AsyncProcessor<T> {
      * @param event The event to be scheduled for verification
      * @param timeout  The time in milliseconds we should wait before the event should fire
      */
-    public TimeoutEvent<T> setVerification( T event, long timeout );
+    public AsynchonrousEvent<T> setVerification( T event, long timeout );
 
 
     /**
-     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the TimeoutEvent should be re-tried.
+     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the AsynchonrousEvent should be re-tried.
      * It is up to the implementer to commit the event so that it does not fire again.  This should never throw exceptions.
      *
      * @param event The event to start
      */
-    public void start(TimeoutEvent<T> event);
+    public void start(AsynchonrousEvent<T> event);
+
+    /**
+     * Add the error listener to the list of listeners
+     * @param listener
+     */
+    public void addErrorListener( ErrorListener<T> listener );
+
+    /**
+     * Add the listener to this instance
+     * @param listener
+     */
+    public void addListener(AsynchronousEventListener<T> listener);
+
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 031eb87..3a00436 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -7,9 +7,9 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.google.inject.assistedinject.Assisted;
 import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 
@@ -27,24 +27,23 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
     private final TimeoutQueue<T> queue;
     private final Scheduler scheduler;
-    private final TimeoutEventListener<T> listener;
+    private final List<AsynchronousEventListener<T>> listeners = new ArrayList<AsynchronousEventListener<T>>(  );
 
     private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
 
-    private List<ErrorListener> listeners = new ArrayList<ErrorListener>();
+    private List<ErrorListener> errorListeners = new ArrayList<ErrorListener>();
 
 
     @Inject
-    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final TimeoutEventListener<T> listener, final Scheduler scheduler ) {
+    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler ) {
         this.queue = queue;
-        this.listener = listener;
         this.scheduler = scheduler;
     }
 
 
 
     @Override
-    public TimeoutEvent<T> setVerification( final T event, final long timeout ) {
+    public AsynchonrousEvent<T> setVerification( final T event, final long timeout ) {
         return queue.queue( event, timeout );
     }
 
@@ -52,7 +51,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
 
     @Override
-    public void start( final TimeoutEvent<T> event ) {
+    public void start( final AsynchonrousEvent<T> event ) {
 
 
         //run this in a timeout command so it doesn't run forever. If it times out, it will simply resume later
@@ -60,8 +59,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
             @Override
             protected Void run() throws Exception {
-                final T busEvent = event.getEvent();
-                listener.receive( busEvent );
+                final T rootEvent = event.getEvent();
+
+                for(AsynchronousEventListener<T> listener: listeners){
+                    listener.receive( rootEvent );
+                }
+
                 return null;
             }
         }.toObservable( scheduler ).subscribe( new Observer<Void>() {
@@ -75,7 +78,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
             public void onError( final Throwable throwable ) {
                 LOG.error( "Unable to process async event", throwable );
 
-                for ( ErrorListener listener : listeners ) {
+                for ( ErrorListener listener : errorListeners ) {
                     listener.onError( event, throwable );
                 }
             }
@@ -90,19 +93,19 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
     }
 
 
-    /**
-     * Add an error listener
-     */
-    public void addListener( ErrorListener listener ) {
+    @Override
+    public void addListener( final AsynchronousEventListener<T> listener ) {
         this.listeners.add( listener );
     }
 
 
     /**
-     * Internal listener for errors, really only used for testing.  Can be used to hook into error state
+     * Add an error listeners
      */
-    public static interface ErrorListener {
-
-        public <T> void onError( TimeoutEvent<T> event, Throwable t );
+    public void addErrorListener( ErrorListener<T> listener ) {
+        this.errorListeners.add( listener );
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java
new file mode 100644
index 0000000..6bb3937
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * An interface for a timeout event
+ */
+public interface AsynchonrousEvent<T> {
+
+    /**
+     * @return The event to fire when our timeout is reached
+     */
+    T getEvent();
+
+    /**
+     * @return The time in epoch millis the event will time out
+     */
+    long getTimeout();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
new file mode 100644
index 0000000..428d850
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ *
+ *
+ */
+public interface AsynchronousEventListener<T> {
+
+
+    /**
+     * The handler to receive the event.  Any exception that is thrown is considered
+     * a failure, and the event will be re-fired.
+     * @param event
+     */
+    void receive(T event);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
new file mode 100644
index 0000000..6c7c360
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -0,0 +1,10 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Internal listener for errors, really only used for testing.  Can be used to hook into error state
+ */
+public interface ErrorListener <T> {
+
+    void onError( AsynchonrousEvent<T> event, Throwable t );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
index 03b7863..6159b93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
@@ -8,19 +8,22 @@ import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 
 /**
  * Simple implementation of our timeout queue using an in memory PriorityBlockingQueue.
  *
  * This SHOULD NOT be used in a production environment.  This is for development/testing runtimes only.
+ * This should not be a singleton, we can have multiple instances of
  */
+@Singleton
 public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
     /**
      * For in memory queueing
      */
-    private final PriorityBlockingQueue<TimeoutEvent<T>> queue = new PriorityBlockingQueue<TimeoutEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
+    private final PriorityBlockingQueue<AsynchonrousEvent<T>> queue = new PriorityBlockingQueue<AsynchonrousEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
 
     private final TimeService timeService;
 
@@ -32,9 +35,9 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
 
     @Override
-    public TimeoutEvent<T> queue( final T event, final long timeout ) {
+    public AsynchonrousEvent<T> queue( final T event, final long timeout ) {
         final long scheduledTimeout = timeService.getCurrentTime() + timeout;
-        final TimeoutEvent<T> queuedEvent = new SimpleTimeoutEvent<T>( event, scheduledTimeout );
+        final AsynchonrousEvent<T> queuedEvent = new SimpleAsynchonrousEvent<T>( event, scheduledTimeout );
 
         queue.add( queuedEvent );
 
@@ -43,16 +46,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
 
     @Override
-    public Collection<TimeoutEvent<T>> take( final int maxSize, final long timeout ) {
+    public Collection<AsynchonrousEvent<T>> take( final int maxSize, final long timeout ) {
 
         final long now = timeService.getCurrentTime();
         final long newTimeout = now+timeout;
 
-        List<TimeoutEvent<T>> results = new ArrayList<TimeoutEvent<T>>(maxSize);
+        List<AsynchonrousEvent<T>> results = new ArrayList<AsynchonrousEvent<T>>(maxSize);
 
         for(int i = 0; i < maxSize; i ++){
 
-            TimeoutEvent<T> queuedEvent = queue.peek();
+            AsynchonrousEvent<T> queuedEvent = queue.peek();
 
             //nothing to do
             if(queuedEvent == null){
@@ -65,7 +68,7 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
                 break;
             }
 
-            final TimeoutEvent<T> newEvent =  new SimpleTimeoutEvent<T>( queuedEvent.getEvent(), newTimeout );
+            final AsynchonrousEvent<T> newEvent =  new SimpleAsynchonrousEvent<T>( queuedEvent.getEvent(), newTimeout );
 
             //re schedule a new event to replace this one
             queue.add(newEvent);
@@ -82,16 +85,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
 
     @Override
-    public boolean remove( final TimeoutEvent<T> event ) {
+    public boolean remove( final AsynchonrousEvent<T> event ) {
         return queue.remove( event );
     }
 
 
-    private static class TimeoutEventCompatator<T> implements Comparator<TimeoutEvent<T>> {
+    private static class TimeoutEventCompatator<T> implements Comparator<AsynchonrousEvent<T>> {
 
 
         @Override
-        public int compare( final TimeoutEvent<T> o1, final TimeoutEvent<T> o2 ) {
+        public int compare( final AsynchonrousEvent<T> o1, final AsynchonrousEvent<T> o2 ) {
             return Long.compare( o1.getTimeout(), o2.getTimeout() );
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java
new file mode 100644
index 0000000..4f1de81
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java
@@ -0,0 +1,30 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ *
+ *
+ */
+public class SimpleAsynchonrousEvent<T> implements AsynchonrousEvent<T> {
+
+    private final T event;
+    private final long timeout;
+
+
+    public SimpleAsynchonrousEvent( final T event, final long timeout ) {
+        this.event = event;
+        this.timeout = timeout;
+    }
+
+
+    @Override
+    public T getEvent() {
+       return event;
+    }
+
+
+    @Override
+    public long getTimeout() {
+        return timeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java
deleted file mode 100644
index 73ef653..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- *
- *
- */
-public class SimpleTimeoutEvent<T> implements TimeoutEvent<T> {
-
-    private final T event;
-    private final long timeout;
-
-
-    public SimpleTimeoutEvent( final T event, final long timeout ) {
-        this.event = event;
-        this.timeout = timeout;
-    }
-
-
-    @Override
-    public T getEvent() {
-       return event;
-    }
-
-
-    @Override
-    public long getTimeout() {
-        return timeout;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java
deleted file mode 100644
index c482730..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- * An interface for a timeout event
- */
-public interface TimeoutEvent<T> {
-
-    /**
-     * @return The event to fire when our timeout is reached
-     */
-    T getEvent();
-
-    /**
-     * @return The time in epoch millis the event will time out
-     */
-    long getTimeout();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java
deleted file mode 100644
index 0bd60e7..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- *
- *
- */
-public interface TimeoutEventListener<T> {
-
-
-    /**
-     * The handler to receive the event.  Any exception that is thrown is considered
-     * a failure, and the event will be re-fired.
-     * @param event
-     */
-    void receive(T event);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
index 3cb5001..2d16960 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
@@ -14,9 +14,9 @@ public interface TimeoutQueue<T> {
      *
      * @param event The event to queue
      * @param timeout The timeout to set on the queue element
-     * @return The TimeoutEvent that has been queued
+     * @return The AsynchonrousEvent that has been queued
      */
-    public TimeoutEvent<T> queue( T event, long timeout );
+    public AsynchonrousEvent<T> queue( T event, long timeout );
 
 
     /**
@@ -29,7 +29,7 @@ public interface TimeoutQueue<T> {
      *
      * @return A collection of events.
      */
-    public Collection<TimeoutEvent<T>> take( int maxSize, long timeout );
+    public Collection<AsynchonrousEvent<T>> take( int maxSize, long timeout );
 
 
     /**
@@ -39,5 +39,5 @@ public interface TimeoutQueue<T> {
      *
      * @return True if the element was removed.  False otherwise
      */
-    public boolean remove( TimeoutEvent<T> event );
+    public boolean remove( AsynchonrousEvent<T> event );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java
new file mode 100644
index 0000000..adf9157
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.guice;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface EdgeDelete {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
new file mode 100644
index 0000000..b8afb13
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.guice;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface EdgeWrite {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index fd56fcc..ed9448b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -30,6 +30,8 @@ import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
+import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
+import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
@@ -43,6 +45,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializatio
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.matcher.Matchers;
@@ -94,23 +97,19 @@ public class GraphModule extends AbstractModule {
          * Graph event bus, will need to be refactored into it's own classes
          */
 
-//        final EventBus eventBus = new EventBus("AsyncProcessorBus");
-//        bind(EventBus.class).toInstance(eventBus);
+          // create a guice factor for getting our collection manager
 
-        //auto register every impl on the event bus
-//        bindListener( Matchers.any(), new TypeListener() {
-//           @Override
-//           public <I> void hear(@SuppressWarnings("unused") final TypeLiteral<I> typeLiteral, final TypeEncounter<I> typeEncounter) {
-//               typeEncounter.register(new InjectionListener<I>() {
-//                   @Override public void afterInjection(final I instance) {
-//                       eventBus.register(instance);
-//                   }
-//               });
-//           }
-//        });
+        //local queue.  Need to
+        bind(TimeoutQueue.class).to( LocalTimeoutQueue.class );
+
+        bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
+        bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
+        bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
+    }
 
-        bind(AsyncProcessor.class).to(AsyncProcessorImpl.class);
 
 
-    }
 }
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java
new file mode 100644
index 0000000..eadbb1e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.guice;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface NodeDelete {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 03d6390..6d49591 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -20,11 +20,9 @@
 package org.apache.usergrid.persistence.graph.impl;
 
 
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
@@ -37,6 +35,9 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.AsynchonrousEvent;
+import org.apache.usergrid.persistence.graph.consistency.AsynchronousEventListener;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
@@ -50,6 +51,7 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.graph.guice.*;
 
 import rx.Observable;
 import rx.Scheduler;
@@ -74,13 +76,27 @@ public class EdgeManagerImpl implements EdgeManager {
 
     private final NodeSerialization nodeSerialization;
 
+    private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
+    private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
+    private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
+
     private final GraphFig graphFig;
 
 
     @Inject
     public EdgeManagerImpl( final Scheduler scheduler, final EdgeMetadataSerialization edgeMetadataSerialization,
                             final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
-                            final GraphFig graphFig, @Assisted final OrganizationScope scope ) {
+                            final GraphFig graphFig,
+                            @EdgeWrite final AsyncProcessor edgeWrite,
+                            @EdgeDelete final AsyncProcessor edgeDelete,
+                            @NodeDelete final AsyncProcessor nodeDelete,
+
+
+                            @Assisted final OrganizationScope scope ) {
+        ValidationUtils.validateOrganizationScope( scope );
+
+
+        this.scope = scope;
         this.scheduler = scheduler;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeSerialization = edgeSerialization;
@@ -88,10 +104,34 @@ public class EdgeManagerImpl implements EdgeManager {
         this.graphFig = graphFig;
 
 
-        ValidationUtils.validateOrganizationScope( scope );
+        this.edgeWriteAsyncProcessor = edgeWrite;
 
 
-        this.scope = scope;
+        this.edgeWriteAsyncProcessor.addListener( new AsynchronousEventListener<Edge>() {
+            @Override
+            public void receive( final Edge edge ) {
+                repairEdgeAsync( edge );
+            }
+        } );
+
+
+        this.edgeDeleteAsyncProcessor = edgeDelete;
+
+        this.edgeDeleteAsyncProcessor.addListener( new AsynchronousEventListener<Edge>() {
+            @Override
+            public void receive( final Edge edge ) {
+                deleteEdgeAsync( edge );
+            }
+        } );
+
+        this.nodeDeleteAsyncProcessor = nodeDelete;
+
+        this.nodeDeleteAsyncProcessor.addListener( new AsynchronousEventListener<Id>() {
+            @Override
+            public void receive( final Id event ) {
+                deleteNodeAsync( event );
+            }
+        } );
     }
 
 
@@ -108,6 +148,9 @@ public class EdgeManagerImpl implements EdgeManager {
 
                 mutation.mergeShallow( edgeMutation );
 
+                final AsynchonrousEvent<Edge> event =
+                        edgeWriteAsyncProcessor.setVerification( edge , getTimeout() );
+
                 try {
                     mutation.execute();
                 }
@@ -115,6 +158,8 @@ public class EdgeManagerImpl implements EdgeManager {
                     throw new RuntimeException( "Unable to connect to cassandra", e );
                 }
 
+                edgeWriteAsyncProcessor.start( event );
+
                 return edge;
             }
         } );
@@ -130,6 +175,10 @@ public class EdgeManagerImpl implements EdgeManager {
             public Edge call( final Edge edge ) {
                 final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
 
+                final AsynchonrousEvent<Edge> event =
+                        edgeDeleteAsyncProcessor.setVerification(  edge , getTimeout() );
+
+
                 try {
                     edgeMutation.execute();
                 }
@@ -137,6 +186,9 @@ public class EdgeManagerImpl implements EdgeManager {
                     throw new RuntimeException( "Unable to connect to cassandra", e );
                 }
 
+                edgeDeleteAsyncProcessor.start( event );
+
+
                 return edge;
             }
         } );
@@ -156,6 +208,10 @@ public class EdgeManagerImpl implements EdgeManager {
 
                 final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
 
+                final AsynchonrousEvent<Id> event =
+                        nodeDeleteAsyncProcessor.setVerification(  node , getTimeout() );
+
+
                 try {
                     nodeMutation.execute();
                 }
@@ -163,6 +219,8 @@ public class EdgeManagerImpl implements EdgeManager {
                     throw new RuntimeException( "Unable to connect to cassandra", e );
                 }
 
+                nodeDeleteAsyncProcessor.start( event );
+
                 return id;
             }
         } );
@@ -290,6 +348,14 @@ public class EdgeManagerImpl implements EdgeManager {
 
 
     /**
+     * Get our timeout for write consistency
+     */
+    private long getTimeout() {
+        return graphFig.getWriteTimeout() * 2;
+    }
+
+
+    /**
      * Helper filter to perform mapping and return an observable of pre-filtered edges
      */
     private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
@@ -303,12 +369,11 @@ public class EdgeManagerImpl implements EdgeManager {
 
 
         /**
-         * Takes a buffered list of marked edges.  It then does a single round trip to fetch marked ids
-         * These are then used in conjunction with the max version filter to filter any edges that should
-         * not be returned
-         * @param markedEdges
-         * @return An observable that emits only edges that can be consumed.  There could be multiple versions
-         * of the same edge so those need de-duped.
+         * Takes a buffered list of marked edges.  It then does a single round trip to fetch marked ids These are then
+         * used in conjunction with the max version filter to filter any edges that should not be returned
+         *
+         * @return An observable that emits only edges that can be consumed.  There could be multiple versions of the
+         *         same edge so those need de-duped.
          */
         @Override
         public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -368,4 +433,20 @@ public class EdgeManagerImpl implements EdgeManager {
             return true;
         }
     }
+
+
+    public void repairEdgeAsync( Edge write ) {
+
+    }
+
+
+    public void deleteEdgeAsync( Edge delete ) {
+
+    }
+
+
+    public void deleteNodeAsync( Id delete ) {
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index bb36bff..8806b92 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -37,7 +37,7 @@ public interface EdgeMetadataSerialization {
 
 
     /**
-     * Write both the source--->Target edge and the target <----- source edge into the mutation
+     * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
      */
     MutationBatch writeEdge( OrganizationScope scope, Edge edge );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
index 1575121..548d821 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
@@ -39,7 +39,7 @@ public interface EdgeSerialization {
 
 
     /**
-     * Write both the source--->Target edge and the target <----- source edge into the mutation
+     * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
      *
      * @param scope The org scope of the graph
      * @param edge The edge to write
@@ -56,7 +56,7 @@ public interface EdgeSerialization {
     MutationBatch markEdge( OrganizationScope scope, Edge edge);
 
     /**
-     * Write both the source -->target edge and the target<--- source edge into the mutation
+     * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
      *
      * @param scope The org scope of the graph
      * @param edge The edge to write

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
index 9853bce..396cd58 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
@@ -50,7 +50,7 @@ public interface NodeSerialization {
 
 
     /**
-     * Delete the mark entry, signaling a delete is complete
+     * EdgeDelete the mark entry, signaling a delete is complete
      * @param scope
      * @param node
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4691973..1dbfbae 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -193,7 +193,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
     /**
-     * Write the edges internally
+     * EdgeWrite the edges internally
      * @param scope  The scope to encapsulate
      * @param edge The edge to write
      * @param op The row operation to invoke
@@ -227,7 +227,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
 
 
         /**
-         * Write edges from target<-source
+         * EdgeWrite edges from target<-source
          */
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index b6ef52e..c041e91 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -21,16 +21,12 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 import java.util.Stack;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-
 import rx.concurrency.Schedulers;
 
 import static org.junit.Assert.assertEquals;
@@ -56,7 +52,7 @@ public class AsyncProcessorTest {
         final TestEvent event = new TestEvent();
 
 
-        final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+        final AsynchonrousEvent<TestEvent> asynchonrousEvent = new AsynchonrousEvent<TestEvent>() {
             @Override
             public TestEvent getEvent() {
                 return event;
@@ -76,13 +72,13 @@ public class AsyncProcessorTest {
 
 
         //mock up the queue
-        when( queue.queue( event, timeout ) ).thenReturn( timeoutEvent );
+        when( queue.queue( event, timeout ) ).thenReturn( asynchonrousEvent );
 
 
-        TimeoutEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
+        AsynchonrousEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
 
         //ensure the timeouts are returned from the Queue subsystem
-        assertSame( timeoutEvent, returned );
+        assertSame( asynchonrousEvent, returned );
     }
 
 
@@ -94,7 +90,7 @@ public class AsyncProcessorTest {
         final TestEvent event = new TestEvent();
 
 
-        final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+        final AsynchonrousEvent<TestEvent> asynchonrousEvent = new AsynchonrousEvent<TestEvent>() {
             @Override
             public TestEvent getEvent() {
                 return event;
@@ -115,7 +111,7 @@ public class AsyncProcessorTest {
         final CountDownLatch latch = new CountDownLatch( 1 );
 
         //mock up the ack to allow us to block the test until the async confirm fires
-        when( queue.remove( timeoutEvent ) ).thenAnswer( new Answer<Boolean>() {
+        when( queue.remove( asynchonrousEvent ) ).thenAnswer( new Answer<Boolean>() {
             @Override
             public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
                 latch.countDown();
@@ -124,7 +120,7 @@ public class AsyncProcessorTest {
         } );
 
 
-        asyncProcessor.start( timeoutEvent );
+        asyncProcessor.start( asynchonrousEvent );
 
 
         //block until the event is fired.  The correct invocation is implicitly verified by the remove mock
@@ -141,7 +137,7 @@ public class AsyncProcessorTest {
         @Test
     public void verifyErrorExecution() throws InterruptedException {
 
-        final ErrorListener listener = new ErrorListener();
+        final AsynchronousErrorListener listener = new AsynchronousErrorListener();
 
 
         final TestEvent event = new TestEvent();
@@ -149,7 +145,7 @@ public class AsyncProcessorTest {
         final boolean[] invoked = new boolean[] { false, false };
 
 
-        final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+        final AsynchonrousEvent<TestEvent> asynchonrousEvent = new AsynchonrousEvent<TestEvent>() {
             @Override
             public TestEvent getEvent() {
                 return event;
@@ -169,20 +165,21 @@ public class AsyncProcessorTest {
 
         final CountDownLatch latch = new CountDownLatch( 1 );
 
-        final TimeoutEvent<?>[] errorEvents = { null };
+        final AsynchonrousEvent<?>[] errorEvents = { null };
 
         //countdown the latch so the test can proceed
-        asyncProcessor.addListener( new AsyncProcessorImpl.ErrorListener() {
+        asyncProcessor.addErrorListener( new ErrorListener<TestEvent>() {
             @Override
-            public <T> void onError( final TimeoutEvent<T> event, final Throwable t ) {
+            public void onError( final AsynchonrousEvent<TestEvent> event, final Throwable t ) {
                 errorEvents[0] = event;
                 invoked[1] = true;
                 latch.countDown();
             }
+
         } );
 
         //throw an error if remove is called.  This shouldn't happen
-        when( queue.remove( timeoutEvent ) ).then( new Answer<Boolean>() {
+        when( queue.remove( asynchonrousEvent ) ).then( new Answer<Boolean>() {
             @Override
             public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
                 invoked[0] = true;
@@ -192,7 +189,7 @@ public class AsyncProcessorTest {
 
 
         //fire the event
-        asyncProcessor.start( timeoutEvent );
+        asyncProcessor.start( asynchonrousEvent );
 
 
         //block until the event is fired.  The invocation verification is part of the error listener unlocking
@@ -213,9 +210,12 @@ public class AsyncProcessorTest {
     /**
      * Construct the async processor
      */
-    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , TimeoutEventListener<T> listener) {
+    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , AsynchronousEventListener<T> listener) {
+
+        AsyncProcessorImpl<T> processor =  new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
+        processor.addListener( listener );
 
-        return new AsyncProcessorImpl( queue, listener, Schedulers.threadPoolForIO() );
+        return processor;
     }
 
 
@@ -230,7 +230,7 @@ public class AsyncProcessorTest {
     }
 
 
-    public static class TestListener implements TimeoutEventListener<TestEvent> {
+    public static class TestListener implements AsynchronousEventListener<TestEvent> {
 
         public final Stack<TestEvent> events = new Stack<TestEvent>();
 
@@ -249,7 +249,7 @@ public class AsyncProcessorTest {
     /**
      * Throw error after the event is fired
      */
-    public static class ErrorListener implements TimeoutEventListener<TestEvent> {
+    public static class AsynchronousErrorListener implements AsynchronousEventListener<TestEvent> {
 
         public final Stack<TestEvent> events = new Stack<TestEvent>();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
index f3107d2..0b741c8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -34,14 +34,14 @@ public class LocalTimeoutQueueTest {
 
         final TestEvent event = new TestEvent();
 
-        TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+        AsynchonrousEvent<TestEvent> asynchonrousEvent = queue.queue( event, timeout );
 
-        assertNotNull( timeoutEvent );
+        assertNotNull( asynchonrousEvent );
 
-        assertEquals( event, timeoutEvent.getEvent() );
-        assertEquals( time + timeout, timeoutEvent.getTimeout() );
+        assertEquals( event, asynchonrousEvent.getEvent() );
+        assertEquals( time + timeout, asynchonrousEvent.getTimeout() );
 
-        Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+        Collection<AsynchonrousEvent<TestEvent>> results = queue.take( 100, timeout );
 
         assertEquals( "Time not yet elapsed", 0, results.size() );
 
@@ -55,9 +55,9 @@ public class LocalTimeoutQueueTest {
         assertEquals( "Time elapsed", 1, results.size() );
 
         //validate we get a new timeout event since the old one was re-scheduled
-        Iterator<TimeoutEvent<TestEvent>> events = results.iterator();
+        Iterator<AsynchonrousEvent<TestEvent>> events = results.iterator();
 
-        TimeoutEvent<TestEvent> message = events.next();
+        AsynchonrousEvent<TestEvent> message = events.next();
 
         assertEquals( event, message.getEvent() );
 
@@ -97,18 +97,18 @@ public class LocalTimeoutQueueTest {
 
             final TestEvent event = new TestEvent();
 
-            TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+            AsynchonrousEvent<TestEvent> asynchonrousEvent = queue.queue( event, timeout );
 
             events.add( event );
 
-            assertNotNull( timeoutEvent );
+            assertNotNull( asynchonrousEvent );
 
-            assertEquals( event, timeoutEvent.getEvent() );
-            assertEquals( time + timeout, timeoutEvent.getTimeout() );
+            assertEquals( event, asynchonrousEvent.getEvent() );
+            assertEquals( time + timeout, asynchonrousEvent.getTimeout() );
         }
 
 
-        Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+        Collection<AsynchonrousEvent<TestEvent>> results = queue.take( 100, timeout );
 
         assertEquals( "Time not yet elapsed", 0, results.size() );
 
@@ -133,11 +133,11 @@ public class LocalTimeoutQueueTest {
             assertEquals( "Time elapsed", 100, results.size() );
 
             //validate we get a new timeout event since the old one was re-scheduled
-            Iterator<TimeoutEvent<TestEvent>> eventIterator = results.iterator();
+            Iterator<AsynchonrousEvent<TestEvent>> eventIterator = results.iterator();
 
             while(eventIterator.hasNext()){
 
-                TimeoutEvent<TestEvent> message = eventIterator.next();
+                AsynchonrousEvent<TestEvent> message = eventIterator.next();
 
                 assertTrue( events.remove( message.getEvent() ) );