You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/08 01:20:50 UTC

[1/3] git commit: Refactor to use the observables. Evaluating hazelcast

Repository: incubator-usergrid
Updated Branches:
  refs/heads/asyncqueue 4012eb73f -> 51a9ffd92


Refactor to use the observables. Evaluating hazelcast


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/da4630f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/da4630f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/da4630f5

Branch: refs/heads/asyncqueue
Commit: da4630f543fab7eb73fdbfab9535c82c04568179
Parents: 4012eb7
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 3 11:16:30 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 3 11:16:30 2014 -0700

----------------------------------------------------------------------
 .../graph/consistency/AsyncProcessor.java       |   8 +-
 .../graph/consistency/AsyncProcessorImpl.java   |  64 ++++----
 .../graph/consistency/AsynchronousEvent.java    |  18 ---
 .../consistency/AsynchronousEventListener.java  |  22 ---
 .../graph/consistency/AsynchronousMessage.java  |  18 +++
 .../graph/consistency/ErrorListener.java        |   2 +-
 .../graph/consistency/LocalTimeoutQueue.java    |  21 +--
 .../graph/consistency/MessageListener.java      |  22 +++
 .../consistency/SimpleAsynchronousEvent.java    |  30 ----
 .../consistency/SimpleAsynchronousMessage.java  |  30 ++++
 .../graph/consistency/TimeoutQueue.java         |   8 +-
 .../persistence/graph/impl/EdgeManagerImpl.java | 153 +++++++++----------
 .../graph/consistency/AsyncProcessorTest.java   |  56 ++++---
 .../consistency/LocalTimeoutQueueTest.java      |  28 ++--
 14 files changed, 245 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 3b3e0b9..f6013d9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -19,16 +19,16 @@ public interface AsyncProcessor<T> {
      * @param event The event to be scheduled for verification
      * @param timeout  The time in milliseconds we should wait before the event should fire
      */
-    public AsynchronousEvent<T> setVerification( T event, long timeout );
+    public AsynchronousMessage<T> setVerification( T event, long timeout );
 
 
     /**
-     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the AsynchronousEvent should be re-tried.
+     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the AsynchronousMessage should be re-tried.
      * It is up to the implementer to commit the event so that it does not fire again.  This should never throw exceptions.
      *
      * @param event The event to start
      */
-    public void start(AsynchronousEvent<T> event);
+    public void start(AsynchronousMessage<T> event);
 
     /**
      * Add the error listener to the list of listeners
@@ -40,7 +40,7 @@ public interface AsyncProcessor<T> {
      * Add the listener to this instance
      * @param listener
      */
-    public void addListener(AsynchronousEventListener<T> listener);
+    public void addListener(MessageListener<T, T> listener);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 5acf728..fbac644 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -9,11 +9,13 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 
-import rx.Observer;
+import rx.Observable;
 import rx.Scheduler;
+import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+import rx.util.functions.FuncN;
 
 
 /**
@@ -26,7 +28,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
     private final TimeoutQueue<T> queue;
     private final Scheduler scheduler;
-    private final List<AsynchronousEventListener<T>> listeners = new ArrayList<AsynchronousEventListener<T>>(  );
+    private final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
 
     private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
 
@@ -40,60 +42,57 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
     }
 
 
-
     @Override
-    public AsynchronousEvent<T> setVerification( final T event, final long timeout ) {
+    public AsynchronousMessage<T> setVerification( final T event, final long timeout ) {
         return queue.queue( event, timeout );
     }
 
 
-
-
     @Override
-    public void start( final AsynchronousEvent<T> event ) {
+    public void start( final AsynchronousMessage<T> event ) {
 
 
-        //run this in a timeout command so it doesn't run forever. If it times out, it will simply resume later
-        new HystrixCommand<Void>( GRAPH_REPAIR ) {
-
-            @Override
-            protected Void run() throws Exception {
-                final T rootEvent = event.getEvent();
+        final T data = event.getEvent();
+        /**
+         * Execute all listeners in parallel
+         */
+        List<Observable<?>> observables = new ArrayList<Observable<?>>( listeners.size() );
 
-                for(AsynchronousEventListener<T> listener: listeners){
-                    listener.receive( rootEvent );
-                }
+        for ( MessageListener<T, T> listener : listeners ) {
+            observables.add( listener.receive( data ).subscribeOn( scheduler ) );
+        }
 
-                return null;
-            }
-        }.toObservable( scheduler ).subscribe( new Observer<Void>() {
+        //run everything in parallel and zip it up
+        Observable.zip( observables, new FuncN<AsynchronousMessage<T>>() {
             @Override
-            public void onCompleted() {
-                queue.remove( event );
+            public AsynchronousMessage<T> call( final Object... args ) {
+                return event;
             }
-
-
+        } ).doOnError( new Action1<Throwable>() {
             @Override
-            public void onError( final Throwable throwable ) {
+            public void call( final Throwable throwable ) {
                 LOG.error( "Unable to process async event", throwable );
 
                 for ( ErrorListener listener : errorListeners ) {
                     listener.onError( event, throwable );
                 }
             }
-
-
+        } ).doOnCompleted( new Action0() {
+            @Override
+            public void call() {
+                queue.remove( event );
+            }
+        } ).subscribe( new Action1<AsynchronousMessage<T>>() {
             @Override
-            public void onNext( final Void args ) {
-                //nothing to do here
+            public void call( final AsynchronousMessage<T> tAsynchronousMessage ) {
+                //To change body of implemented methods use File | Settings | File Templates.
             }
         } );
-
     }
 
 
     @Override
-    public void addListener( final AsynchronousEventListener<T> listener ) {
+    public void addListener( final MessageListener<T, T> listener ) {
         this.listeners.add( listener );
     }
 
@@ -104,7 +103,4 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
     public void addErrorListener( ErrorListener<T> listener ) {
         this.errorListeners.add( listener );
     }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java
deleted file mode 100644
index fe38a93..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- * An interface for a timeout event
- */
-public interface AsynchronousEvent<T> {
-
-    /**
-     * @return The event to fire when our timeout is reached
-     */
-    T getEvent();
-
-    /**
-     * @return The time in epoch millis the event will time out
-     */
-    long getTimeout();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
deleted file mode 100644
index 6cb93cb..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-import rx.Observable;
-
-
-/**
- *
- *
- */
-public interface AsynchronousEventListener<T, R> {
-
-
-    /**
-     * The handler to receive the event.  Any exception that is thrown is considered
-     * a failure, and the event will be re-fired.
-     * @param event  The input event
-     * @return The observable that performs the operations
-     */
-    Observable<R> receive(T event);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java
new file mode 100644
index 0000000..c9bd00a
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * An interface for a timeout event
+ */
+public interface AsynchronousMessage<T> {
+
+    /**
+     * @return The event to fire when our timeout is reached
+     */
+    T getEvent();
+
+    /**
+     * @return The time in epoch millis the event will time out
+     */
+    long getTimeout();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
index f6cef41..f736cdb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -6,5 +6,5 @@ package org.apache.usergrid.persistence.graph.consistency;
  */
 public interface ErrorListener <T> {
 
-    void onError( AsynchronousEvent<T> event, Throwable t );
+    void onError( AsynchronousMessage<T> event, Throwable t );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
index 3a3b2ce..d2cc17f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
@@ -23,7 +23,7 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
     /**
      * For in memory queueing
      */
-    private final PriorityBlockingQueue<AsynchronousEvent<T>> queue = new PriorityBlockingQueue<AsynchronousEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
+    private final PriorityBlockingQueue<AsynchronousMessage<T>> queue = new PriorityBlockingQueue<AsynchronousMessage<T>>( 1000, new TimeoutEventCompatator<T>() );
 
     private final TimeService timeService;
 
@@ -35,9 +35,9 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
 
     @Override
-    public AsynchronousEvent<T> queue( final T event, final long timeout ) {
+    public AsynchronousMessage<T> queue( final T event, final long timeout ) {
         final long scheduledTimeout = timeService.getCurrentTime() + timeout;
-        final AsynchronousEvent<T> queuedEvent = new SimpleAsynchronousEvent<T>( event, scheduledTimeout );
+        final AsynchronousMessage<T> queuedEvent = new SimpleAsynchronousMessage<T>( event, scheduledTimeout );
 
         queue.add( queuedEvent );
 
@@ -46,16 +46,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
 
     @Override
-    public Collection<AsynchronousEvent<T>> take( final int maxSize, final long timeout ) {
+    public Collection<AsynchronousMessage<T>> take( final int maxSize, final long timeout ) {
 
         final long now = timeService.getCurrentTime();
         final long newTimeout = now+timeout;
 
-        List<AsynchronousEvent<T>> results = new ArrayList<AsynchronousEvent<T>>(maxSize);
+        List<AsynchronousMessage<T>> results = new ArrayList<AsynchronousMessage<T>>(maxSize);
 
         for(int i = 0; i < maxSize; i ++){
 
-            AsynchronousEvent<T> queuedEvent = queue.peek();
+            AsynchronousMessage<T> queuedEvent = queue.peek();
 
             //nothing to do
             if(queuedEvent == null){
@@ -68,7 +68,8 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
                 break;
             }
 
-            final AsynchronousEvent<T> newEvent =  new SimpleAsynchronousEvent<T>( queuedEvent.getEvent(), newTimeout );
+            final AsynchronousMessage<T>
+                    newEvent =  new SimpleAsynchronousMessage<T>( queuedEvent.getEvent(), newTimeout );
 
             //re schedule a new event to replace this one
             queue.add(newEvent);
@@ -85,16 +86,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
 
 
     @Override
-    public boolean remove( final AsynchronousEvent<T> event ) {
+    public boolean remove( final AsynchronousMessage<T> event ) {
         return queue.remove( event );
     }
 
 
-    private static class TimeoutEventCompatator<T> implements Comparator<AsynchronousEvent<T>> {
+    private static class TimeoutEventCompatator<T> implements Comparator<AsynchronousMessage<T>> {
 
 
         @Override
-        public int compare( final AsynchronousEvent<T> o1, final AsynchronousEvent<T> o2 ) {
+        public int compare( final AsynchronousMessage<T> o1, final AsynchronousMessage<T> o2 ) {
             return Long.compare( o1.getTimeout(), o2.getTimeout() );
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
new file mode 100644
index 0000000..8a21dbb
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
@@ -0,0 +1,22 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import rx.Observable;
+
+
+/**
+ *
+ *
+ */
+public interface MessageListener<T, R> {
+
+
+    /**
+     * The handler to receive the event.  Any exception that is thrown is considered
+     * a failure, and the event will be re-fired.
+     * @param event  The input event
+     * @return The observable that performs the operations
+     */
+    Observable<T> receive(T event);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java
deleted file mode 100644
index fdfcd60..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- *
- *
- */
-public class SimpleAsynchronousEvent<T> implements AsynchronousEvent<T> {
-
-    private final T event;
-    private final long timeout;
-
-
-    public SimpleAsynchronousEvent( final T event, final long timeout ) {
-        this.event = event;
-        this.timeout = timeout;
-    }
-
-
-    @Override
-    public T getEvent() {
-       return event;
-    }
-
-
-    @Override
-    public long getTimeout() {
-        return timeout;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
new file mode 100644
index 0000000..1e7a04b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
@@ -0,0 +1,30 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ *
+ *
+ */
+public class SimpleAsynchronousMessage<T> implements AsynchronousMessage<T> {
+
+    private final T event;
+    private final long timeout;
+
+
+    public SimpleAsynchronousMessage( final T event, final long timeout ) {
+        this.event = event;
+        this.timeout = timeout;
+    }
+
+
+    @Override
+    public T getEvent() {
+       return event;
+    }
+
+
+    @Override
+    public long getTimeout() {
+        return timeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
index 556aa88..0caed9c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
@@ -14,9 +14,9 @@ public interface TimeoutQueue<T> {
      *
      * @param event The event to queue
      * @param timeout The timeout to set on the queue element
-     * @return The AsynchronousEvent that has been queued
+     * @return The AsynchronousMessage that has been queued
      */
-    public AsynchronousEvent<T> queue( T event, long timeout );
+    public AsynchronousMessage<T> queue( T event, long timeout );
 
 
     /**
@@ -29,7 +29,7 @@ public interface TimeoutQueue<T> {
      *
      * @return A collection of events.
      */
-    public Collection<AsynchronousEvent<T>> take( int maxSize, long timeout );
+    public Collection<AsynchronousMessage<T>> take( int maxSize, long timeout );
 
 
     /**
@@ -39,5 +39,5 @@ public interface TimeoutQueue<T> {
      *
      * @return True if the element was removed.  False otherwise
      */
-    public boolean remove( AsynchronousEvent<T> event );
+    public boolean remove( AsynchronousMessage<T> event );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 7fd9246..1cc2fce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -36,8 +36,8 @@ import org.apache.usergrid.persistence.graph.SearchByIdType;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
-import org.apache.usergrid.persistence.graph.consistency.AsynchronousEvent;
-import org.apache.usergrid.persistence.graph.consistency.AsynchronousEventListener;
+import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
 import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
 import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
 import org.apache.usergrid.persistence.graph.guice.NodeDelete;
@@ -135,7 +135,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
                 mutation.mergeShallow( edgeMutation );
 
-                final AsynchronousEvent<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
+                final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
 
                 try {
                     mutation.execute();
@@ -161,7 +161,7 @@ public class EdgeManagerImpl implements EdgeManager {
             public Edge call( final Edge edge ) {
                 final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
 
-                final AsynchronousEvent<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
+                final AsynchronousMessage<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
 
 
                 try {
@@ -191,7 +191,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
                 final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
 
-                final AsynchronousEvent<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
+                final AsynchronousMessage<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
 
 
                 try {
@@ -420,10 +420,10 @@ public class EdgeManagerImpl implements EdgeManager {
     /**
      * Construct the asynchronous edge lister for the repair operation.
      */
-    public class EdgeWriteListener implements AsynchronousEventListener<Edge, Integer> {
+    public class EdgeWriteListener implements MessageListener<Edge, Edge> {
 
         @Override
-        public Observable<Integer> receive( final Edge write ) {
+        public Observable<Edge> receive( final Edge write ) {
 
             final UUID maxVersion = write.getVersion();
 
@@ -452,9 +452,9 @@ public class EdgeManagerImpl implements EdgeManager {
                     return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
                 }
                 //buffer the deletes and issue them in a single mutation
-            } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Integer>() {
+            } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Edge>() {
                 @Override
-                public Integer call( final List<MarkedEdge> markedEdges ) {
+                public Edge call( final List<MarkedEdge> markedEdges ) {
 
                     final int size = markedEdges.size();
 
@@ -473,7 +473,7 @@ public class EdgeManagerImpl implements EdgeManager {
                         throw new RuntimeException( "Unable to issue write to cassandra", e );
                     }
 
-                    return size;
+                    return write;
                 }
             } );
         }
@@ -483,10 +483,10 @@ public class EdgeManagerImpl implements EdgeManager {
     /**
      * Construct the asynchronous delete operation from the listener
      */
-    public class EdgeDeleteListener implements AsynchronousEventListener<Edge, Integer> {
+    public class EdgeDeleteListener implements MessageListener<Edge, Edge> {
 
         @Override
-        public Observable<Integer> receive( final Edge delete ) {
+        public Observable<Edge> receive( final Edge delete ) {
 
             final UUID maxVersion = delete.getVersion();
 
@@ -496,76 +496,57 @@ public class EdgeManagerImpl implements EdgeManager {
 
                     //search by edge type and target type.  If any other edges with this target type exist,
                     // we can't delete it
-                    Observable<Integer> sourceIdType= loadEdgesFromSourceByType(
+                    Observable<MutationBatch> sourceIdType = loadEdgesFromSourceByType(
                             new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
-                                    edge.getTargetNode().getType(), null ) ).take( 2 ).count().doOnEach(
-                            new Action1<Integer>() {
+                                    edge.getTargetNode().getType(), null ) ).take( 2 ).count()
+                            .map( new Func1<Integer, MutationBatch>() {
                                 @Override
-                                public void call( final Integer count ) {
+                                public MutationBatch call( final Integer count ) {
                                     //There's nothing to do, we have 2 different edges with the same edge type and
                                     // target type.  Don't delete meta data
                                     if ( count == 2 ) {
-                                        return;
+                                        return null;
                                     }
 
-                                    try {
-                                        edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete ).execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new RuntimeException( "Unable to execute mutation", e );
-                                    }
+                                    return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
                                 }
                             } );
 
 
-                    Observable<Integer> targetIdType = loadEdgesToTargetByType(
+                    Observable<MutationBatch> targetIdType = loadEdgesToTargetByType(
                             new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
                                     edge.getSourceNode().getType(), null ) ).take( 2 ).count()
-                            .doOnEach( new Action1<Integer>() {
-
-
+                            .map( new Func1<Integer, MutationBatch>() {
                                 @Override
-                                public void call( final Integer count ) {
-
-
+                                public MutationBatch call( final Integer count ) {
                                     //There's nothing to do, we have 2 different edges with the same edge type and
                                     // target type.  Don't delete meta data
                                     if ( count == 2 ) {
-                                        return;
+                                        return null;
                                     }
 
-                                    try {
-                                        edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete ).execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new RuntimeException( "Unable to execute mutation", e );
-                                    }
+
+                                    return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
                                 }
                             } );
 
                     //search by edge type and target type.  If any other edges with this target type exist,
                     // we can't delete it
-                    Observable<Integer> sourceType = loadEdgesFromSource(
+                    Observable<MutationBatch> sourceType = loadEdgesFromSource(
                             new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) )
-                            .take( 2 ).count().doOnEach( new Action1<Integer>() {
-
-
+                            .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
                                 @Override
-                                public void call( final Integer count ) {
+                                public MutationBatch call( final Integer count ) {
 
 
                                     //There's nothing to do, we have 2 different edges with the same edge type and
                                     // target type.  Don't delete meta data
                                     if ( count == 2 ) {
-                                        return;
+                                        return null;
                                     }
 
-                                    try {
-                                        edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete ).execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new RuntimeException( "Unable to execute mutation", e );
-                                    }
+
+                                    return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
                                 }
                             } );
 
@@ -625,12 +606,19 @@ public class EdgeManagerImpl implements EdgeManager {
                                 }
                             } );
                 }
-            } ).map( new Func1<MutationBatch, MutationBatch>() {
+            } ).map( new Func1<MutationBatch, Edge>() {
                 @Override
-                public MutationBatch call( final MutationBatch mutationBatch ) {
-                    return mutationBatch;
+                public Edge call( final MutationBatch mutationBatch ) {
+                    try {
+                        mutationBatch.execute();
+                    }
+                    catch ( ConnectionException e ) {
+                        throw new RuntimeException( "Unable to execute mutation", e );
+                    }
+
+                    return delete;
                 }
-            } ).count();
+            } );
         }
     }
 
@@ -638,35 +626,44 @@ public class EdgeManagerImpl implements EdgeManager {
     /**
      * Construct the asynchronous node delete from the q
      */
-    public class NodeDeleteListener implements AsynchronousEventListener<Id, Integer> {
+    public class NodeDeleteListener implements MessageListener<Id, Id> {
 
         @Override
-        public Observable<Integer> receive( final Id node ) {
+        public Observable<Id> receive( final Id node ) {
 
-            final Optional<UUID> mark = nodeSerialization.getMaxVersion( scope, node );
 
-            //Nothing to do, just exist with 0 count
-            if ( !mark.isPresent() ) {
-                return Observable.just( 0 );
-            }
-
-            final UUID maxVersion = mark.get();
-
-            return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
-                    .flatMap( new Func1<String, Observable<Edge>>() {
-                        @Override
-                        public Observable<Edge> call( final String edgeType ) {
-
-                           //for each edge type, we want to search all edges < this version to the node and delete them. We might want to batch this up for efficiency
-                            return loadEdgesToTarget( new SimpleSearchByEdgeType( node, edgeType, maxVersion, null ) )
-                                    .doOnEach( new Action1<Edge>() {
-                                        @Override
-                                        public void call( final Edge edge ) {
-                                            deleteEdge( edge );
-                                        }
-                                    } );
-                        }
-                    } ).count();
+            return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+                @Override
+                public Optional<UUID> call( final Id id ) {
+                    return nodeSerialization.getMaxVersion( scope, node );
+                }
+            } ).flatMap( new Func1<Optional<UUID>, Observable<Edge>>() {
+                @Override
+                public Observable<Edge> call( final Optional<UUID> uuidOptional ) {
+                    return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
+                            .flatMap( new Func1<String, Observable<Edge>>() {
+                                @Override
+                                public Observable<Edge> call( final String edgeType ) {
+
+                                    //for each edge type, we want to search all edges < this version to the node and
+                                    // delete them. We might want to batch this up for efficiency
+                                    return loadEdgesToTarget(
+                                            new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+                                            .doOnEach( new Action1<Edge>() {
+                                                @Override
+                                                public void call( final Edge edge ) {
+                                                    deleteEdge( edge );
+                                                }
+                                            } );
+                                }
+                            } );
+                }
+            } ).map( new Func1<Edge, Id>() {
+                @Override
+                public Id call( final Edge edge ) {
+                    return node;
+                }
+            } );
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index 6224b0a..fc53705 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -27,7 +27,9 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import rx.Observable;
 import rx.concurrency.Schedulers;
+import rx.util.functions.Action1;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -52,7 +54,7 @@ public class AsyncProcessorTest {
         final TestEvent event = new TestEvent();
 
 
-        final AsynchronousEvent<TestEvent> asynchronousEvent = new AsynchronousEvent<TestEvent>() {
+        final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
             @Override
             public TestEvent getEvent() {
                 return event;
@@ -72,13 +74,13 @@ public class AsyncProcessorTest {
 
 
         //mock up the queue
-        when( queue.queue( event, timeout ) ).thenReturn( asynchronousEvent );
+        when( queue.queue( event, timeout ) ).thenReturn( asynchronousMessage );
 
 
-        AsynchronousEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
+        AsynchronousMessage<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
 
         //ensure the timeouts are returned from the Queue subsystem
-        assertSame( asynchronousEvent, returned );
+        assertSame( asynchronousMessage, returned );
     }
 
 
@@ -90,7 +92,7 @@ public class AsyncProcessorTest {
         final TestEvent event = new TestEvent();
 
 
-        final AsynchronousEvent<TestEvent> asynchronousEvent = new AsynchronousEvent<TestEvent>() {
+        final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
             @Override
             public TestEvent getEvent() {
                 return event;
@@ -111,7 +113,7 @@ public class AsyncProcessorTest {
         final CountDownLatch latch = new CountDownLatch( 1 );
 
         //mock up the ack to allow us to block the test until the async confirm fires
-        when( queue.remove( asynchronousEvent ) ).thenAnswer( new Answer<Boolean>() {
+        when( queue.remove( asynchronousMessage ) ).thenAnswer( new Answer<Boolean>() {
             @Override
             public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
                 latch.countDown();
@@ -120,7 +122,7 @@ public class AsyncProcessorTest {
         } );
 
 
-        asyncProcessor.start( asynchronousEvent );
+        asyncProcessor.start( asynchronousMessage );
 
 
         //block until the event is fired.  The correct invocation is implicitly verified by the remove mock
@@ -145,7 +147,7 @@ public class AsyncProcessorTest {
         final boolean[] invoked = new boolean[] { false, false };
 
 
-        final AsynchronousEvent<TestEvent> asynchronousEvent = new AsynchronousEvent<TestEvent>() {
+        final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
             @Override
             public TestEvent getEvent() {
                 return event;
@@ -165,12 +167,12 @@ public class AsyncProcessorTest {
 
         final CountDownLatch latch = new CountDownLatch( 1 );
 
-        final AsynchronousEvent<?>[] errorEvents = { null };
+        final AsynchronousMessage<?>[] errorEvents = { null };
 
         //countdown the latch so the test can proceed
         asyncProcessor.addErrorListener( new ErrorListener<TestEvent>() {
             @Override
-            public void onError( final AsynchronousEvent<TestEvent> event, final Throwable t ) {
+            public void onError( final AsynchronousMessage<TestEvent> event, final Throwable t ) {
                 errorEvents[0] = event;
                 invoked[1] = true;
                 latch.countDown();
@@ -179,7 +181,7 @@ public class AsyncProcessorTest {
         } );
 
         //throw an error if remove is called.  This shouldn't happen
-        when( queue.remove( asynchronousEvent ) ).then( new Answer<Boolean>() {
+        when( queue.remove( asynchronousMessage ) ).then( new Answer<Boolean>() {
             @Override
             public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
                 invoked[0] = true;
@@ -189,7 +191,7 @@ public class AsyncProcessorTest {
 
 
         //fire the event
-        asyncProcessor.start( asynchronousEvent );
+        asyncProcessor.start( asynchronousMessage );
 
 
         //block until the event is fired.  The invocation verification is part of the error listener unlocking
@@ -210,7 +212,7 @@ public class AsyncProcessorTest {
     /**
      * Construct the async processor
      */
-    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , AsynchronousEventListener<T> listener) {
+    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , MessageListener<T, T> listener) {
 
         AsyncProcessorImpl<T> processor =  new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
         processor.addListener( listener );
@@ -230,7 +232,7 @@ public class AsyncProcessorTest {
     }
 
 
-    public static class TestListener implements AsynchronousEventListener<TestEvent> {
+    public static class TestListener implements MessageListener<TestEvent, TestEvent> {
 
         public final Stack<TestEvent> events = new Stack<TestEvent>();
 
@@ -239,9 +241,16 @@ public class AsyncProcessorTest {
 
         }
 
+
         @Override
-        public void receive( final TestEvent event ) {
-            events.push( event );
+        public Observable<TestEvent> receive( final TestEvent event ) {
+
+            return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
+               @Override
+               public void call( final TestEvent testEvent ) {
+                   events.push( testEvent );
+               }
+           });
         }
     }
 
@@ -249,15 +258,22 @@ public class AsyncProcessorTest {
     /**
      * Throw error after the event is fired
      */
-    public static class AsynchronousErrorListener implements AsynchronousEventListener<TestEvent> {
+    public static class AsynchronousErrorListener implements MessageListener<TestEvent, TestEvent> {
 
         public final Stack<TestEvent> events = new Stack<TestEvent>();
 
 
         @Override
-        public void receive( final TestEvent event ) {
-            events.push( event );
-            throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
+        public Observable<TestEvent> receive( final TestEvent event ) {
+            return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
+                          @Override
+                          public void call( final TestEvent testEvent ) {
+                              events.push( testEvent );
+                              throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
+                          }
+                      });
         }
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
index 76e09f6..49ae789 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -34,14 +34,14 @@ public class LocalTimeoutQueueTest {
 
         final TestEvent event = new TestEvent();
 
-        AsynchronousEvent<TestEvent> asynchronousEvent = queue.queue( event, timeout );
+        AsynchronousMessage<TestEvent> asynchronousMessage = queue.queue( event, timeout );
 
-        assertNotNull( asynchronousEvent );
+        assertNotNull( asynchronousMessage );
 
-        assertEquals( event, asynchronousEvent.getEvent() );
-        assertEquals( time + timeout, asynchronousEvent.getTimeout() );
+        assertEquals( event, asynchronousMessage.getEvent() );
+        assertEquals( time + timeout, asynchronousMessage.getTimeout() );
 
-        Collection<AsynchronousEvent<TestEvent>> results = queue.take( 100, timeout );
+        Collection<AsynchronousMessage<TestEvent>> results = queue.take( 100, timeout );
 
         assertEquals( "Time not yet elapsed", 0, results.size() );
 
@@ -55,9 +55,9 @@ public class LocalTimeoutQueueTest {
         assertEquals( "Time elapsed", 1, results.size() );
 
         //validate we get a new timeout event since the old one was re-scheduled
-        Iterator<AsynchronousEvent<TestEvent>> events = results.iterator();
+        Iterator<AsynchronousMessage<TestEvent>> events = results.iterator();
 
-        AsynchronousEvent<TestEvent> message = events.next();
+        AsynchronousMessage<TestEvent> message = events.next();
 
         assertEquals( event, message.getEvent() );
 
@@ -97,18 +97,18 @@ public class LocalTimeoutQueueTest {
 
             final TestEvent event = new TestEvent();
 
-            AsynchronousEvent<TestEvent> asynchronousEvent = queue.queue( event, timeout );
+            AsynchronousMessage<TestEvent> asynchronousMessage = queue.queue( event, timeout );
 
             events.add( event );
 
-            assertNotNull( asynchronousEvent );
+            assertNotNull( asynchronousMessage );
 
-            assertEquals( event, asynchronousEvent.getEvent() );
-            assertEquals( time + timeout, asynchronousEvent.getTimeout() );
+            assertEquals( event, asynchronousMessage.getEvent() );
+            assertEquals( time + timeout, asynchronousMessage.getTimeout() );
         }
 
 
-        Collection<AsynchronousEvent<TestEvent>> results = queue.take( 100, timeout );
+        Collection<AsynchronousMessage<TestEvent>> results = queue.take( 100, timeout );
 
         assertEquals( "Time not yet elapsed", 0, results.size() );
 
@@ -133,11 +133,11 @@ public class LocalTimeoutQueueTest {
             assertEquals( "Time elapsed", 100, results.size() );
 
             //validate we get a new timeout event since the old one was re-scheduled
-            Iterator<AsynchronousEvent<TestEvent>> eventIterator = results.iterator();
+            Iterator<AsynchronousMessage<TestEvent>> eventIterator = results.iterator();
 
             while(eventIterator.hasNext()){
 
-                AsynchronousEvent<TestEvent> message = eventIterator.next();
+                AsynchronousMessage<TestEvent> message = eventIterator.next();
 
                 assertTrue( events.remove( message.getEvent() ) );
 


[3/3] git commit: Upgraded queue system and processing

Posted by sn...@apache.org.
Upgraded queue system and processing

Updated RX to 0.17-RC7

Rolled custom Hystrix to test RX integration


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/51a9ffd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/51a9ffd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/51a9ffd9

Branch: refs/heads/asyncqueue
Commit: 51a9ffd9218bdaf4884709fd04bae9c046dc67ec
Parents: c45f7ee
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 16:59:06 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 16:59:06 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |  21 +--
 .../collection/hystrix/CassandraCommand.java    |   4 +-
 .../impl/EntityCollectionManagerImpl.java       |   6 +-
 .../mvcc/stage/delete/MarkCommit.java           |   2 +-
 .../collection/mvcc/stage/delete/MarkStart.java |   2 +-
 .../collection/mvcc/stage/load/Load.java        |   2 +-
 .../mvcc/stage/write/WriteCommit.java           |   2 +-
 .../mvcc/stage/write/WriteOptimisticVerify.java |   2 +-
 .../collection/mvcc/stage/write/WriteStart.java |   2 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |   5 +-
 .../collection/rx/CassandraThreadScheduler.java |   2 +-
 .../rx/CassandraThreadSchedulerTest.java        |  44 +++----
 .../persistence/collection/rx/ParallelTest.java |   8 +-
 .../usergrid/persistence/graph/GraphFig.java    |  14 ++
 .../graph/consistency/AsyncProcessor.java       |  50 +++++---
 .../graph/consistency/AsyncProcessorImpl.java   |  60 ++++++---
 .../graph/consistency/CompleteListener.java     |  14 ++
 .../graph/consistency/TimeoutTask.java          |  61 +++++++++
 .../graph/impl/EdgeDeleteListener.java          |   6 +-
 .../persistence/graph/impl/EdgeManagerImpl.java |   2 +-
 .../graph/impl/EdgeWriteListener.java           |   2 +-
 .../graph/impl/NodeDeleteListener.java          |   6 +-
 .../impl/parse/ObservableIterator.java          |  18 ++-
 .../graph/consistency/AsyncProcessorTest.java   | 127 +++++++++++++++----
 24 files changed, 337 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index db3ce3a..00db576 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -159,17 +159,17 @@
     <!-- RX java -->
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>0.16.1</version>
+        <groupId>com.netflix.rxjava</groupId>
+        <artifactId>rxjava-core</artifactId>
+        <version>0.17.0-RC7</version>
     </dependency>
 
 
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-contrib</artifactId>
-      <version>0.14.6</version>
-    </dependency>
+    <!--<dependency>-->
+      <!--<groupId>com.netflix.rxjava</groupId>-->
+      <!--<artifactId>rxjava-contrib</artifactId>-->
+      <!--<version>0.14.6</version>-->
+    <!--</dependency>-->
 
     <dependency>
       <groupId>com.google.inject.extensions</groupId>
@@ -201,10 +201,13 @@
       <version>${log4j.version}</version>
     </dependency>
 
+    <!--Remove custom build once this patch is complete
+    https://github.com/Netflix/Hystrix/pull/209-->
+
     <dependency>
       <groupId>com.netflix.hystrix</groupId>
       <artifactId>hystrix-core</artifactId>
-      <version>1.3.8</version>
+      <version>1.3.14-SNAPSHOT</version>
     </dependency>
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index c87bf33..731933a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -24,7 +24,7 @@ import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -69,6 +69,6 @@ public class CassandraCommand<R> extends HystrixCommand<R> {
      */
     private static <R> Observable<R> toObservable( R readValue ) {
         //create a new command and ensure it's observed on the correct thread scheduler
-        return new CassandraCommand<R>( readValue ).toObservable( Schedulers.threadPoolForIO() );
+        return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 4391d09..ef95a75 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -47,9 +47,9 @@ import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.Func2;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.functions.FuncN;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9c1a432..06c875e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -41,7 +41,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
index 4b0fae5..e718f4b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
@@ -45,7 +45,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
index 20abc5a..e0d0f77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index bcec305..4c90e69 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -41,7 +41,7 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import org.apache.usergrid.persistence.model.field.Field;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index e42019c..7a70dfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -8,7 +8,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index 01fee57..a0ff1b1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -24,7 +24,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 55280c8..fe53b13 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -40,8 +40,8 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
 
 
 /**
@@ -168,6 +168,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
         };
 
 
+
         return Observable.zip( fields, zipFunction );
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
index 90f3b2d..a5baf76 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
@@ -35,7 +35,7 @@ import com.google.inject.Provider;
 import com.google.inject.name.Named;
 
 import rx.Scheduler;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
 
 
 public class CassandraThreadScheduler implements Provider<Scheduler> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
index 45a175e..cf399c3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
@@ -18,6 +18,7 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import com.google.inject.Inject;
 
 import rx.Scheduler;
+import rx.functions.Action1;
 import rx.util.functions.Action0;
 
 import static org.junit.Assert.assertTrue;
@@ -63,13 +64,11 @@ public class CassandraThreadSchedulerTest {
         //schedule and we should fail
 
         try {
-
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
-                    //no op
-                }
-            } );
+            rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
+                            }
+                            });
 
             fail( "This should have thrown an exception" );
         }
@@ -127,9 +126,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -158,9 +157,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -215,9 +214,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -244,9 +243,9 @@ public class CassandraThreadSchedulerTest {
 
         try {
 
-            rxScheduler.schedule( new Action0() {
-                @Override
-                public void call() {
+            rxScheduler.schedule(  new Action1<Scheduler.Inner>() {
+                            @Override
+                            public void call( final Scheduler.Inner inner ) {
                     //no op
                 }
             } );
@@ -295,9 +294,10 @@ public class CassandraThreadSchedulerTest {
         final CountDownLatch latch = new CountDownLatch( totalCount );
 
         for ( int i = 0; i < totalCount; i++ ) {
-            final Action0 action = new Action0() {
-                @Override
-                public void call() {
+
+            final Action1<Scheduler.Inner> action = new  Action1<Scheduler.Inner>() {
+                                       @Override
+                                       public void call( final Scheduler.Inner inner ) {
                     try {
                         final String threadName = Thread.currentThread().getName();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index ac252fb..434ea26 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -38,9 +38,9 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.concurrency.Schedulers;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -82,7 +82,7 @@ public class ParallelTest {
         //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
 
         //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
-        final Scheduler scheduler = Schedulers.threadPoolForIO();
+        final Scheduler scheduler = Schedulers.io();
 
         //set our size equal
         ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index bbf83be..3f87d14 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,11 @@ public interface GraphFig extends GuicyFig {
 
     public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
 
+
+    public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
+
+    public static final String TIMEOUT_TASK_TIME = "usergrid.graph.timeout.task.time";
+
     public static final String READ_CL = "usergrid.graph.read.cl";
 
     public static final String WRITE_CL = "usergrid.graph.write.cl";
@@ -37,6 +42,15 @@ public interface GraphFig extends GuicyFig {
     @Key( WRITE_TIMEOUT )
     long getWriteTimeout();
 
+    @Default( "100" )
+    @Key( TIMEOUT_SIZE )
+    int getTimeoutReadSize();
+
+    @Default("500")
+    @Key( TIMEOUT_TASK_TIME )
+    long getTaskLoopTime();
+
+
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 150aa6b..016b45d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -1,48 +1,62 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import java.util.Collection;
+
+
 /**
- *  Used to fork lazy repair and other types of operations.
- *
+ * Used to fork lazy repair and other types of operations.
  */
 public interface AsyncProcessor<T> {
 
 
     /**
-     * The processor implementation is responsible for guaranteeing the events fire in the runtime environment.
-     * This could be local or clustered, consult the documentation on the implementation.  Note that events published
-     * here could possibly be double published if the operation reaches it's timeout before completion.  As a result, every
-     * receiver of the event should operate in an idempotent way.  Note that the event will fire at a time >= the timeout time.
-     * Firing immediately should not be assumed.
+     * The processor implementation is responsible for guaranteeing the events fire in the runtime environment. This
+     * could be local or clustered, consult the documentation on the implementation.  Note that events published here
+     * could possibly be double published if the operation reaches it's timeout before completion.  As a result, every
+     * receiver of the event should operate in an idempotent way.  Note that the event will fire at a time >= the
+     * timeout time. Firing immediately should not be assumed.
      *
      * @param event The event to be scheduled for verification
-     * @param timeout  The time in milliseconds we should wait before the event should fire
+     * @param timeout The time in milliseconds we should wait before the event should fire
      */
     public AsynchronousMessage<T> setVerification( T event, long timeout );
 
 
     /**
-     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the AsynchronousMessage should be re-tried.
-     * It is up to the implementer to commit the event so that it does not fire again.  This should never throw exceptions.
+     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the
+     * AsynchronousMessage should be re-tried. It is up to the implementer to commit the event so that it does not fire
+     * again.  This should never throw exceptions.
      *
      * @param event The event to start
      */
-    public void start(AsynchronousMessage<T> event);
+    public void start( AsynchronousMessage<T> event );
+
+
+    /**
+     * Get all events that have passed their timeout
+     *
+     * @param maxCount The maximum count
+     * @param timeout The timeout to set when retrieving these timeouts to ensure they aren't lost
+     *
+     * @return A collection of asynchronous messages that have passed their timeout.  This could be due to process
+     *         failure node loss etc.  No assumptions regarding the state of the message should be assumed when they are
+     *         returned.
+     */
+    public Collection<AsynchronousMessage<T>> getTimeouts( int maxCount, long timeout );
 
     /**
      * Add the error listener to the list of listeners
-     * @param listener
      */
     public void addErrorListener( ErrorListener<T> listener );
 
     /**
      * Add the listener to this instance
-     * @param listener
      */
-    public void addListener(MessageListener<T, T> listener);
-
-
-
-
+    public void addListener( MessageListener<T, T> listener );
 
+    /**
+     * Add a complete listener that is invoked when the listener has been invoked
+     */
+    public void addCompleteListener( CompleteListener<T> listener );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 20c40e2..d962a88 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -2,45 +2,59 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.graph.GraphFig;
+
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Action0;
-import rx.util.functions.Action1;
-import rx.util.functions.FuncN;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
 
 
 /**
- * The implementation of asynchronous processing.
- * This is intentionally kept as a 1 processor to 1 event type mapping
+ * The implementation of asynchronous processing. This is intentionally kept as a 1 processor to 1 event type mapping
  * This way reflection is not used, event dispatching is easier, and has compile time checking
  */
 @Singleton
 public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
-    private static final HystrixCommandGroupKey GRAPH_REPAIR = HystrixCommandGroupKey.Factory.asKey( "Graph_Repair" );
-
-    private final TimeoutQueue<T> queue;
-    private final Scheduler scheduler;
-    private final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+    /**
+     * TODO, run this with hystrix
+     */
 
     private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
 
-    private List<ErrorListener> errorListeners = new ArrayList<ErrorListener>();
+    protected final TimeoutQueue<T> queue;
+    protected final Scheduler scheduler;
+    protected final GraphFig graphFig;
+    protected final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+
+
+    protected List<ErrorListener<T>> errorListeners = new ArrayList<ErrorListener<T>>();
+    protected List<CompleteListener<T>> completeListeners = new ArrayList<CompleteListener<T>>();
 
 
     @Inject
-    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler ) {
+    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler, final GraphFig graphFig ) {
         this.queue = queue;
         this.scheduler = scheduler;
+        this.graphFig = graphFig;
+
+        //we purposefully use a new thread.  We don't want to use one of the I/O threads to run this task
+        //in the event the scheduler is full, we'll end up rejecting the reschedule of this task
+        Schedulers.newThread().schedulePeriodically( new TimeoutTask<T>(this, graphFig), graphFig.getTaskLoopTime(),  graphFig.getTaskLoopTime(), TimeUnit.MILLISECONDS );
     }
 
 
@@ -52,8 +66,6 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
     @Override
     public void start( final AsynchronousMessage<T> event ) {
-
-
         final T data = event.getEvent();
         /**
          * Execute all listeners in parallel
@@ -83,6 +95,10 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
             @Override
             public void call() {
                 queue.remove( event );
+
+                for ( CompleteListener<T> listener : completeListeners ) {
+                    listener.onComplete( event );
+                }
             }
         } ).subscribe( new Action1<AsynchronousMessage<T>>() {
             @Override
@@ -94,6 +110,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
 
     @Override
+    public Collection<AsynchronousMessage<T>> getTimeouts( final int maxCount, final long timeout ) {
+        return queue.take( maxCount, timeout );
+    }
+
+
+    @Override
     public void addListener( final MessageListener<T, T> listener ) {
         this.listeners.add( listener );
     }
@@ -105,4 +127,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
     public void addErrorListener( ErrorListener<T> listener ) {
         this.errorListeners.add( listener );
     }
+
+
+    @Override
+    public void addCompleteListener( final CompleteListener<T> listener ) {
+        this.completeListeners.add( listener );
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
new file mode 100644
index 0000000..c415005
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Internal listener for errors, really only used for testing.  Can be used to hook into error state
+ */
+public interface CompleteListener<T> {
+
+    /**
+     * Invoked when an event is complete
+     * @param event
+     */
+    void onComplete( AsynchronousMessage<T> event );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
new file mode 100644
index 0000000..6607724
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
@@ -0,0 +1,61 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.graph.GraphFig;
+
+import rx.Scheduler;
+import rx.functions.Action1;
+
+
+/**
+ *
+ *
+ */
+public class TimeoutTask<T> implements Action1<Scheduler.Inner> {
+
+    private final AsyncProcessor<T> processor;
+    private final GraphFig graphFig;
+
+
+    public TimeoutTask( final AsyncProcessor<T> processor, final GraphFig graphFig ) {
+        this.processor = processor;
+        this.graphFig = graphFig;
+    }
+
+
+    @Override
+    public void call( final Scheduler.Inner inner ) {
+
+        /**
+         * We purposefully loop through a tight loop.  If we have anything to process, we need to do so
+         * Once we run out of items to process, this thread will sleep and the timer will fire
+         */
+        while(!inner.isUnsubscribed()) {
+
+            Iterator<AsynchronousMessage<T>> timeouts = getTimeouts();
+
+            /**
+             * We're done, just exit
+             */
+            if(!timeouts.hasNext()){
+                return;
+            }
+
+            while ( timeouts.hasNext() ) {
+                processor.start( timeouts.next() );
+            }
+
+        }
+    }
+
+
+    /**
+     * Get the timeouts
+     * @return
+     */
+    private Iterator<AsynchronousMessage<T>> getTimeouts() {
+        return processor.getTimeouts( graphFig.getTimeoutReadSize(), graphFig.getWriteTimeout() * 2 ).iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 824b05c..6feded3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -23,9 +23,9 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
-import rx.util.functions.Func5;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func5;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 663215a..b84bd8e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -56,7 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index 428085e..271375a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -22,7 +22,7 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 08cc942..aabc572 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -20,9 +20,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 
+import rx.Notification;
 import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
+import rx.functions.Action1;
+import rx.functions.Func1;
 
 
 /**
@@ -75,6 +76,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
                                 // delete them. We might want to batch this up for efficiency
                                 return loadEdgesToTarget( scope,
                                         new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+
                                         .doOnEach( new Action1<MarkedEdge>() {
                                             @Override
                                             public void call( final MarkedEdge markedEdge ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 58ff6c7..2274868 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -5,6 +5,7 @@ import java.util.Iterator;
 
 import rx.Observable;
 import rx.Observer;
+import rx.Subscriber;
 import rx.Subscription;
 import rx.subscriptions.Subscriptions;
 
@@ -14,11 +15,11 @@ import rx.subscriptions.Subscriptions;
  * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows
  * us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
  */
-public abstract class ObservableIterator<T> implements Observable.OnSubscribeFunc<T> {
+public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
 
 
     @Override
-    public Subscription onSubscribe( final Observer<? super T> observer ) {
+    public void call( final Subscriber<? super T> subscriber ) {
 
 
         try {
@@ -26,25 +27,22 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribeFun
             Iterator<T> itr = getIterator();
 
 
-            //TODO T.N. when > 0.17 comes out, we need to implement the check with each loop as described here https://github.com/Netflix/RxJava/issues/802
-            while ( itr.hasNext()) {
-                observer.onNext( itr.next() );
+            //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
+            while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
+                subscriber.onNext( itr.next() );
             }
 
-            observer.onCompleted();
+            subscriber.onCompleted();
         }
 
         //if any error occurs, we need to notify the observer so it can perform it's own error handling
         catch ( Throwable t ) {
-            observer.onError( t );
+            subscriber.onError( t );
         }
 
-        return Subscriptions.empty();
     }
 
 
-
-
     /**
      * Return the iterator to feed data to
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index fc53705..9822cc5 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
 
@@ -29,7 +31,7 @@ import org.mockito.stubbing.Answer;
 
 import rx.Observable;
 import rx.concurrency.Schedulers;
-import rx.util.functions.Action1;
+import rx.functions.Action1;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -70,7 +72,7 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        AsyncProcessor asyncProcessor = constructProcessor( queue, null );
+        AsyncProcessor asyncProcessor = constructProcessor( queue );
 
 
         //mock up the queue
@@ -89,6 +91,8 @@ public class AsyncProcessorTest {
 
         final TestListener listener = new TestListener();
 
+
+
         final TestEvent event = new TestEvent();
 
 
@@ -108,9 +112,17 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        final AsyncProcessor asyncProcessor = constructProcessor( queue, listener );
+        final AsyncProcessor asyncProcessor = constructProcessor( queue );
+
+        asyncProcessor.addListener( listener );
+
+
+        final CountDownLatch latch = new CountDownLatch( 2 );
+
+        final TestCompleteListener completeListener = new TestCompleteListener(latch);
+
+        asyncProcessor.addCompleteListener( completeListener );
 
-        final CountDownLatch latch = new CountDownLatch( 1 );
 
         //mock up the ack to allow us to block the test until the async confirm fires
         when( queue.remove( asynchronousMessage ) ).thenAnswer( new Answer<Boolean>() {
@@ -132,11 +144,15 @@ public class AsyncProcessorTest {
         final TestEvent firedEvent = listener.events.peek();
 
         assertSame( event, firedEvent );
+
+        final TestEvent completeEvent = completeListener.events.peek();
+
+        assertSame(event, completeEvent);
     }
 
 
-//    @Test( timeout = 5000 )
-        @Test
+    //    @Test( timeout = 5000 )
+    @Test
     public void verifyErrorExecution() throws InterruptedException {
 
         final AsynchronousErrorListener listener = new AsynchronousErrorListener();
@@ -163,7 +179,9 @@ public class AsyncProcessorTest {
         final TimeoutQueue queue = mock( TimeoutQueue.class );
 
 
-        final AsyncProcessorImpl asyncProcessor = constructProcessor( queue, listener );
+        final AsyncProcessorImpl asyncProcessor = constructProcessor( queue );
+
+        asyncProcessor.addListener( listener );
 
         final CountDownLatch latch = new CountDownLatch( 1 );
 
@@ -177,7 +195,6 @@ public class AsyncProcessorTest {
                 invoked[1] = true;
                 latch.countDown();
             }
-
         } );
 
         //throw an error if remove is called.  This shouldn't happen
@@ -209,13 +226,56 @@ public class AsyncProcessorTest {
     }
 
 
+    @Test
+    public void verifyTimeout() {
+
+
+        final long timeout = 500;
+        final TestEvent event = new TestEvent();
+
+
+        final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
+            @Override
+            public TestEvent getEvent() {
+                return event;
+            }
+
+
+            @Override
+            public long getTimeout() {
+                return timeout;
+            }
+        };
+
+        final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+        when(queue.take( 1, 10000l )).thenReturn( Collections.singletonList(asynchronousMessage ));
+
+        AsyncProcessor<TestEvent> processor = constructProcessor( queue );
+
+
+        Collection<AsynchronousMessage<TestEvent>> timeouts =  processor.getTimeouts( 1, 10000l );
+
+        assertEquals(1, timeouts.size());
+
+        AsynchronousMessage<TestEvent> returned = timeouts.iterator().next();
+
+        assertSame(asynchronousMessage, returned);
+
+
+
+    }
+
+
+
     /**
      * Construct the async processor
      */
-    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , MessageListener<T, T> listener) {
+    public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
+
+        AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.threadPoolForIO() );
 
-        AsyncProcessorImpl<T> processor =  new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
-        processor.addListener( listener );
 
         return processor;
     }
@@ -245,12 +305,29 @@ public class AsyncProcessorTest {
         @Override
         public Observable<TestEvent> receive( final TestEvent event ) {
 
-            return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
-               @Override
-               public void call( final TestEvent testEvent ) {
-                   events.push( testEvent );
-               }
-           });
+            return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+                @Override
+                public void call( final TestEvent testEvent ) {
+                    events.push( testEvent );
+                }
+            } );
+        }
+    }
+
+
+    public static class TestCompleteListener implements CompleteListener<TestEvent> {
+
+        private final CountDownLatch latch;
+        private final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+        public TestCompleteListener( final CountDownLatch latch ) {this.latch = latch;}
+
+
+        @Override
+        public void onComplete( final AsynchronousMessage<TestEvent> event ) {
+            events.push( event.getEvent() );
+            latch.countDown();
         }
     }
 
@@ -265,15 +342,13 @@ public class AsyncProcessorTest {
 
         @Override
         public Observable<TestEvent> receive( final TestEvent event ) {
-            return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
-                          @Override
-                          public void call( final TestEvent testEvent ) {
-                              events.push( testEvent );
-                              throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
-                          }
-                      });
+            return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+                @Override
+                public void call( final TestEvent testEvent ) {
+                    events.push( testEvent );
+                    throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
+                }
+            } );
         }
-
-
     }
 }


[2/3] git commit: Initial refactor of listeners and event system

Posted by sn...@apache.org.
Initial refactor of listeners and event system


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c45f7ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c45f7ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c45f7ee9

Branch: refs/heads/asyncqueue
Commit: c45f7ee9d7e625fbe7ad89ffc42441a3453ecad0
Parents: da4630f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 12:07:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 12:07:35 2014 -0700

----------------------------------------------------------------------
 .../graph/consistency/AsyncProcessor.java       |   3 +-
 .../graph/consistency/AsyncProcessorImpl.java   |   6 +-
 .../graph/consistency/ErrorListener.java        |   5 +
 .../graph/consistency/MessageListener.java      |   4 +-
 .../consistency/SimpleAsynchronousMessage.java  |   3 +-
 .../graph/impl/EdgeDeleteListener.java          | 199 ++++++++++++++
 .../persistence/graph/impl/EdgeEvent.java       |  31 +++
 .../persistence/graph/impl/EdgeManagerImpl.java | 261 -------------------
 .../graph/impl/EdgeWriteListener.java           | 103 ++++++++
 .../graph/impl/NodeDeleteListener.java          | 128 +++++++++
 10 files changed, 476 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index f6013d9..150aa6b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -2,8 +2,7 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 /**
- *  Used to fork lazy repair and other types of operations.  This can be implemented
- *  across multiple environments.
+ *  Used to fork lazy repair and other types of operations.
  *
  */
 public interface AsyncProcessor<T> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index fbac644..20c40e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -19,7 +19,9 @@ import rx.util.functions.FuncN;
 
 
 /**
- * The implementation of asynchronous processing
+ * The implementation of asynchronous processing.
+ * This is intentionally kept as a 1 processor to 1 event type mapping
+ * This way reflection is not used, event dispatching is easier, and has compile time checking
  */
 @Singleton
 public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@@ -84,7 +86,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
             }
         } ).subscribe( new Action1<AsynchronousMessage<T>>() {
             @Override
-            public void call( final AsynchronousMessage<T> tAsynchronousMessage ) {
+            public void call( final AsynchronousMessage<T> asynchronousMessage ) {
                 //To change body of implemented methods use File | Settings | File Templates.
             }
         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
index f736cdb..9d21304 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -6,5 +6,10 @@ package org.apache.usergrid.persistence.graph.consistency;
  */
 public interface ErrorListener <T> {
 
+    /**
+     * Invoked when an error occurs during asynchronous processing
+     * @param event
+     * @param t
+     */
     void onError( AsynchronousMessage<T> event, Throwable t );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
index 8a21dbb..466e4ed 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
@@ -1,6 +1,8 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
 import rx.Observable;
 
 
@@ -17,6 +19,6 @@ public interface MessageListener<T, R> {
      * @param event  The input event
      * @return The observable that performs the operations
      */
-    Observable<T> receive(T event);
+    Observable<T> receive(final T event);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
index 1e7a04b..0a8651c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
@@ -2,7 +2,8 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 /**
- *
+ *  Simple message that just contains the event and the timeout.  More advanced queue implementations
+ *  will most likely subclass this class.
  *
  */
 public class SimpleAsynchronousMessage<T> implements AsynchronousMessage<T> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
new file mode 100644
index 0000000..824b05c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -0,0 +1,199 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.EdgeManager;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.Func5;
+
+
+/**
+ * Construct the asynchronous delete operation from the listener
+ */
+@Singleton
+public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+
+    private final EdgeSerialization edgeSerialization;
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeManagerFactory edgeManagerFactory;
+    private final Keyspace keyspace;
+
+
+    @Inject
+    public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
+                               final EdgeMetadataSerialization edgeMetadataSerialization,
+                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
+                                   final AsyncProcessor edgeDelete ) {
+        this.edgeSerialization = edgeSerialization;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeManagerFactory = edgeManagerFactory;
+        this.keyspace = keyspace;
+
+        edgeDelete.addListener( this );
+    }
+
+
+    @Override
+    public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> delete ) {
+
+        final Edge edge = delete.getData();
+        final OrganizationScope scope = delete.getOrganizationScope();
+        final UUID maxVersion = edge.getVersion();
+        final EdgeManager edgeManager = edgeManagerFactory.createEdgeManager( scope );
+
+
+        return Observable.from( edge ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
+            @Override
+            public Observable<MutationBatch> call( final Edge edge ) {
+
+                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+                //go through every version of this edge <= the current version and remove it
+                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>() {
+                    @Override
+                    protected Iterator<MarkedEdge> getIterator() {
+                        return edgeSerialization.getEdgeToTarget( scope,
+                                new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                        edge.getVersion(), null ) );
+                    }
+                } ).doOnEach( new Action1<MarkedEdge>() {
+                    @Override
+                    public void call( final MarkedEdge markedEdge ) {
+                        final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
+                        batch.mergeShallow( delete );
+                    }
+                } );
+
+
+                //search by edge type and target type.  If any other edges with this target type exist,
+                // we can't delete it
+                Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
+                        new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
+                                edge.getTargetNode().getType(), null ) ).take( 2 ).count()
+                                                              .doOnEach( new Action1<Integer>() {
+                                                                  @Override
+                                                                  public void call( final Integer count ) {
+                                                                      //There's nothing to do,
+                                                                      // we have 2 different edges with the
+                                                                      // same edge type and
+                                                                      // target type.  Don't delete meta data
+                                                                      if ( count == 1 ) {
+                                                                          final MutationBatch delete =
+                                                                                  edgeMetadataSerialization
+                                                                                          .removeEdgeTypeFromSource(
+                                                                                                  scope, edge );
+                                                                          batch.mergeShallow( delete );
+                                                                      }
+                                                                  }
+                                                              } );
+
+
+                Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
+                        new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
+                                edge.getSourceNode().getType(), null ) ).take( 2 ).count()
+                                                              .doOnEach( new Action1<Integer>() {
+                                                                  @Override
+                                                                  public void call( final Integer count ) {
+                                                                      //There's nothing to do,
+                                                                      // we have 2 different edges with the
+                                                                      // same edge type and
+                                                                      // target type.  Don't delete meta data
+                                                                      if ( count == 1 ) {
+                                                                          final MutationBatch delete =
+                                                                                  edgeMetadataSerialization
+                                                                                          .removeEdgeTypeToTarget(
+                                                                                                  scope, edge );
+                                                                          batch.mergeShallow( delete );
+                                                                      }
+                                                                  }
+                                                              } );
+
+
+                //search by edge type and target type.  If any other edges with this target type exist,
+                // we can't delete it
+                Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
+                        new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+                                                            .count().doOnEach( new Action1<Integer>() {
+                            @Override
+                            public void call( final Integer count ) {
+                                //There's nothing to do,
+                                // we have 2 different edges with the
+                                // same edge type and
+                                // target type.  Don't delete meta data
+                                if ( count == 1 ) {
+                                    final MutationBatch delete =
+                                            edgeMetadataSerialization.removeEdgeTypeFromSource( scope, edge );
+                                }
+                            }
+                        } );
+
+
+                Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
+                        new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+                                                            .count().doOnEach( new Action1<Integer>() {
+                            @Override
+                            public void call( final Integer count ) {
+                                //There's nothing to do,
+                                // we have 2 different edges with the
+                                // same edge type and
+                                // target type.  Don't delete meta data
+                                if ( count == 1 ) {
+                                    final MutationBatch delete =
+                                            edgeMetadataSerialization.removeEdgeTypeToTarget( scope, edge );
+                                }
+                            }
+                        } );
+
+
+                //no op, just wait for each observable to populate the mutation before returning it
+                return Observable.zip( edges, sourceIdType, targetIdType, sourceType, targetType,
+                        new Func5<MarkedEdge, Integer, Integer, Integer, Integer, MutationBatch>() {
+                            @Override
+                            public MutationBatch call( final MarkedEdge markedEdge, final Integer integer,
+                                                       final Integer integer2, final Integer integer3,
+                                                       final Integer integer4 ) {
+                                return batch;
+                            }
+                        } );
+            }
+        }
+
+
+                                              ).map( new Func1<MutationBatch, EdgeEvent<Edge>>() {
+            @Override
+            public EdgeEvent<Edge> call( final MutationBatch mutationBatch ) {
+                try {
+                    mutationBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
+
+                return delete;
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
new file mode 100644
index 0000000..d4b6f91
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -0,0 +1,31 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
+
+/**
+ * Get the edge event in the organizational scope
+ *
+ */
+public class EdgeEvent<T> {
+
+    private final OrganizationScope organizationScope;
+    private final T data;
+
+
+    public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+        this.organizationScope = organizationScope;
+        this.data = data;
+    }
+
+
+    public OrganizationScope getOrganizationScope() {
+        return organizationScope;
+    }
+
+
+    public T getData() {
+        return data;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 1cc2fce..663215a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
-import org.apache.usergrid.persistence.graph.consistency.MessageListener;
 import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
 import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
 import org.apache.usergrid.persistence.graph.guice.NodeDelete;
@@ -50,7 +49,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
@@ -58,9 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Action1;
 import rx.util.functions.Func1;
-import rx.util.functions.Func4;
 
 
 /**
@@ -109,16 +105,10 @@ public class EdgeManagerImpl implements EdgeManager {
         this.edgeWriteAsyncProcessor = edgeWrite;
 
 
-        this.edgeWriteAsyncProcessor.addListener( new EdgeWriteListener() );
-
-
         this.edgeDeleteAsyncProcessor = edgeDelete;
 
-        this.edgeDeleteAsyncProcessor.addListener( new EdgeDeleteListener() );
 
         this.nodeDeleteAsyncProcessor = nodeDelete;
-
-        this.nodeDeleteAsyncProcessor.addListener( new NodeDeleteListener() );
     }
 
 
@@ -415,255 +405,4 @@ public class EdgeManagerImpl implements EdgeManager {
             return true;
         }
     }
-
-
-    /**
-     * Construct the asynchronous edge lister for the repair operation.
-     */
-    public class EdgeWriteListener implements MessageListener<Edge, Edge> {
-
-        @Override
-        public Observable<Edge> receive( final Edge write ) {
-
-            final UUID maxVersion = write.getVersion();
-
-            return Observable.create( new ObservableIterator<MarkedEdge>() {
-                @Override
-                protected Iterator<MarkedEdge> getIterator() {
-
-                    final SimpleSearchByEdge search =
-                            new SimpleSearchByEdge( write.getSourceNode(), write.getType(), write.getTargetNode(),
-                                    maxVersion, null );
-
-                    return edgeSerialization.getEdgeFromSource( scope, search );
-                }
-            } ).filter( new Func1<MarkedEdge, Boolean>() {
-
-                //TODO, reuse this for delete operation
-
-
-                /**
-                 * We only want to return edges < this version so we remove them
-                 * @param markedEdge
-                 * @return
-                 */
-                @Override
-                public Boolean call( final MarkedEdge markedEdge ) {
-                    return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
-                }
-                //buffer the deletes and issue them in a single mutation
-            } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Edge>() {
-                @Override
-                public Edge call( final List<MarkedEdge> markedEdges ) {
-
-                    final int size = markedEdges.size();
-
-                    final MutationBatch batch = edgeSerialization.deleteEdge( scope, markedEdges.get( 0 ) );
-
-                    for ( int i = 1; i < size; i++ ) {
-                        final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdges.get( i ) );
-
-                        batch.mergeShallow( delete );
-                    }
-
-                    try {
-                        batch.execute();
-                    }
-                    catch ( ConnectionException e ) {
-                        throw new RuntimeException( "Unable to issue write to cassandra", e );
-                    }
-
-                    return write;
-                }
-            } );
-        }
-    }
-
-
-    /**
-     * Construct the asynchronous delete operation from the listener
-     */
-    public class EdgeDeleteListener implements MessageListener<Edge, Edge> {
-
-        @Override
-        public Observable<Edge> receive( final Edge delete ) {
-
-            final UUID maxVersion = delete.getVersion();
-
-            return Observable.from( delete ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
-                @Override
-                public Observable<MutationBatch> call( final Edge edge ) {
-
-                    //search by edge type and target type.  If any other edges with this target type exist,
-                    // we can't delete it
-                    Observable<MutationBatch> sourceIdType = loadEdgesFromSourceByType(
-                            new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
-                                    edge.getTargetNode().getType(), null ) ).take( 2 ).count()
-                            .map( new Func1<Integer, MutationBatch>() {
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-                                    return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
-                                }
-                            } );
-
-
-                    Observable<MutationBatch> targetIdType = loadEdgesToTargetByType(
-                            new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
-                                    edge.getSourceNode().getType(), null ) ).take( 2 ).count()
-                            .map( new Func1<Integer, MutationBatch>() {
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-
-                                    return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
-                                }
-                            } );
-
-                    //search by edge type and target type.  If any other edges with this target type exist,
-                    // we can't delete it
-                    Observable<MutationBatch> sourceType = loadEdgesFromSource(
-                            new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) )
-                            .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-
-
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-
-                                    return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
-                                }
-                            } );
-
-
-                    Observable<MutationBatch> targetType = loadEdgesToTarget(
-                            new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) )
-                            .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
-
-
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-
-
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-                                    return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
-                                }
-                            } );
-
-
-                    return Observable.zip( sourceIdType, targetIdType, sourceType, targetType,
-                            new Func4<MutationBatch, MutationBatch, MutationBatch, MutationBatch, MutationBatch>() {
-
-
-                                @Override
-                                public MutationBatch call( final MutationBatch mutationBatch,
-                                                           final MutationBatch mutationBatch2,
-                                                           final MutationBatch mutationBatch3,
-                                                           final MutationBatch mutationBatch4 ) {
-
-                                    return join( join( join( mutationBatch, mutationBatch2 ), mutationBatch3 ),
-                                            mutationBatch4 );
-                                }
-
-
-                                private MutationBatch join( MutationBatch first, MutationBatch second ) {
-                                    if ( first == null ) {
-                                        if ( second == null ) {
-                                            return null;
-                                        }
-
-                                        return second;
-                                    }
-
-
-                                    else if ( second == null ) {
-                                        return first;
-                                    }
-
-                                    first.mergeShallow( second );
-
-                                    return first;
-                                }
-                            } );
-                }
-            } ).map( new Func1<MutationBatch, Edge>() {
-                @Override
-                public Edge call( final MutationBatch mutationBatch ) {
-                    try {
-                        mutationBatch.execute();
-                    }
-                    catch ( ConnectionException e ) {
-                        throw new RuntimeException( "Unable to execute mutation", e );
-                    }
-
-                    return delete;
-                }
-            } );
-        }
-    }
-
-
-    /**
-     * Construct the asynchronous node delete from the q
-     */
-    public class NodeDeleteListener implements MessageListener<Id, Id> {
-
-        @Override
-        public Observable<Id> receive( final Id node ) {
-
-
-            return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
-                @Override
-                public Optional<UUID> call( final Id id ) {
-                    return nodeSerialization.getMaxVersion( scope, node );
-                }
-            } ).flatMap( new Func1<Optional<UUID>, Observable<Edge>>() {
-                @Override
-                public Observable<Edge> call( final Optional<UUID> uuidOptional ) {
-                    return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
-                            .flatMap( new Func1<String, Observable<Edge>>() {
-                                @Override
-                                public Observable<Edge> call( final String edgeType ) {
-
-                                    //for each edge type, we want to search all edges < this version to the node and
-                                    // delete them. We might want to batch this up for efficiency
-                                    return loadEdgesToTarget(
-                                            new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
-                                            .doOnEach( new Action1<Edge>() {
-                                                @Override
-                                                public void call( final Edge edge ) {
-                                                    deleteEdge( edge );
-                                                }
-                                            } );
-                                }
-                            } );
-                }
-            } ).map( new Func1<Edge, Id>() {
-                @Override
-                public Id call( final Edge edge ) {
-                    return node;
-                }
-            } );
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
new file mode 100644
index 0000000..428085e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -0,0 +1,103 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous edge lister for the repair operation.
+ */
+@Singleton
+public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+    private final EdgeSerialization edgeSerialization;
+    private final GraphFig graphFig;
+    private final Keyspace keyspace;
+
+
+    public EdgeWriteListener( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                              final Keyspace keyspace, @EdgeWrite final AsyncProcessor edgeWrite ) {
+        this.edgeSerialization = edgeSerialization;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
+        edgeWrite.addListener( this );
+    }
+
+
+    @Override
+    public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> write ) {
+
+        final Edge edge = write.getData();
+        final OrganizationScope scope = write.getOrganizationScope();
+        final UUID maxVersion = edge.getVersion();
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
+                                null );
+
+                return edgeSerialization.getEdgeFromSource( scope, search );
+            }
+        } ).filter( new Func1<MarkedEdge, Boolean>() {
+
+            //TODO, reuse this for delete operation
+
+
+            /**
+             * We only want to return edges < this version so we remove them
+             * @param markedEdge
+             * @return
+             */
+            @Override
+            public Boolean call( final MarkedEdge markedEdge ) {
+                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+            }
+            //buffer the deletes and issue them in a single mutation
+        } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
+            @Override
+            public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
+
+                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                for ( MarkedEdge edge : markedEdges ) {
+                    final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+                    batch.mergeShallow( delete );
+                }
+
+                try {
+                    batch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to issue write to cassandra", e );
+                }
+
+                return write;
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
new file mode 100644
index 0000000..08cc942
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.NodeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous node delete from the q
+ */
+public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
+
+    private final NodeSerialization nodeSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+
+
+    /**
+     * Wire the serialization dependencies
+     */
+    @Inject
+    public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
+                               final EdgeMetadataSerialization edgeMetadataSerialization, @NodeDelete final AsyncProcessor nodeDelete) {
+
+
+        this.nodeSerialization = nodeSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        nodeDelete.addListener( this );
+    }
+
+
+    @Override
+    public Observable<EdgeEvent<Id>> receive( final EdgeEvent<Id> edgeEvent ) {
+
+        final Id node = edgeEvent.getData();
+        final OrganizationScope scope = edgeEvent.getOrganizationScope();
+
+
+        return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+            @Override
+            public Optional<UUID> call( final Id id ) {
+                return nodeSerialization.getMaxVersion( scope, node );
+            }
+        } ).flatMap( new Func1<Optional<UUID>, Observable<MarkedEdge>>() {
+            @Override
+            public Observable<MarkedEdge> call( final Optional<UUID> uuidOptional ) {
+
+                return getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
+                            @Override
+                            public Observable<MarkedEdge> call( final String edgeType ) {
+
+                                //for each edge type, we want to search all edges < this version to the node and
+                                // delete them. We might want to batch this up for efficiency
+                                return loadEdgesToTarget( scope,
+                                        new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+                                        .doOnEach( new Action1<MarkedEdge>() {
+                                            @Override
+                                            public void call( final MarkedEdge markedEdge ) {
+                                                edgeSerialization.deleteEdge( scope, markedEdge );
+                                            }
+                                        } );
+                            }
+                        } );
+            }
+        } ).map( new Func1<MarkedEdge, EdgeEvent<Id>>() {
+            @Override
+            public EdgeEvent<Id> call( final MarkedEdge edge ) {
+                return edgeEvent;
+            }
+        } );
+    }
+
+
+    /**
+     * Get all existing edge types to the target node
+     * @param scope
+     * @param search
+     * @return
+     */
+    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+            }
+        } );
+    }
+
+
+    /**
+     * Load all edges pointing to this target
+     * @param scope
+     * @param search
+     * @return
+     */
+    private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTarget( scope, search );
+            }
+        } );
+    }
+}