You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/08 01:20:51 UTC

[2/3] git commit: Initial refactor of listeners and event system

Initial refactor of listeners and event system


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c45f7ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c45f7ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c45f7ee9

Branch: refs/heads/asyncqueue
Commit: c45f7ee9d7e625fbe7ad89ffc42441a3453ecad0
Parents: da4630f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 12:07:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 12:07:35 2014 -0700

----------------------------------------------------------------------
 .../graph/consistency/AsyncProcessor.java       |   3 +-
 .../graph/consistency/AsyncProcessorImpl.java   |   6 +-
 .../graph/consistency/ErrorListener.java        |   5 +
 .../graph/consistency/MessageListener.java      |   4 +-
 .../consistency/SimpleAsynchronousMessage.java  |   3 +-
 .../graph/impl/EdgeDeleteListener.java          | 199 ++++++++++++++
 .../persistence/graph/impl/EdgeEvent.java       |  31 +++
 .../persistence/graph/impl/EdgeManagerImpl.java | 261 -------------------
 .../graph/impl/EdgeWriteListener.java           | 103 ++++++++
 .../graph/impl/NodeDeleteListener.java          | 128 +++++++++
 10 files changed, 476 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index f6013d9..150aa6b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -2,8 +2,7 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 /**
- *  Used to fork lazy repair and other types of operations.  This can be implemented
- *  across multiple environments.
+ *  Used to fork lazy repair and other types of operations.
  *
  */
 public interface AsyncProcessor<T> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index fbac644..20c40e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -19,7 +19,9 @@ import rx.util.functions.FuncN;
 
 
 /**
- * The implementation of asynchronous processing
+ * The implementation of asynchronous processing.
+ * This is intentionally kept as a 1 processor to 1 event type mapping
+ * This way reflection is not used, event dispatching is easier, and has compile time checking
  */
 @Singleton
 public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@@ -84,7 +86,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
             }
         } ).subscribe( new Action1<AsynchronousMessage<T>>() {
             @Override
-            public void call( final AsynchronousMessage<T> tAsynchronousMessage ) {
+            public void call( final AsynchronousMessage<T> asynchronousMessage ) {
                 //To change body of implemented methods use File | Settings | File Templates.
             }
         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
index f736cdb..9d21304 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -6,5 +6,10 @@ package org.apache.usergrid.persistence.graph.consistency;
  */
 public interface ErrorListener <T> {
 
+    /**
+     * Invoked when an error occurs during asynchronous processing
+     * @param event
+     * @param t
+     */
     void onError( AsynchronousMessage<T> event, Throwable t );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
index 8a21dbb..466e4ed 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
@@ -1,6 +1,8 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
 import rx.Observable;
 
 
@@ -17,6 +19,6 @@ public interface MessageListener<T, R> {
      * @param event  The input event
      * @return The observable that performs the operations
      */
-    Observable<T> receive(T event);
+    Observable<T> receive(final T event);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
index 1e7a04b..0a8651c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
@@ -2,7 +2,8 @@ package org.apache.usergrid.persistence.graph.consistency;
 
 
 /**
- *
+ *  Simple message that just contains the event and the timeout.  More advanced queue implementations
+ *  will most likely subclass this class.
  *
  */
 public class SimpleAsynchronousMessage<T> implements AsynchronousMessage<T> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
new file mode 100644
index 0000000..824b05c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -0,0 +1,199 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.EdgeManager;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.Func5;
+
+
+/**
+ * Construct the asynchronous delete operation from the listener
+ */
+@Singleton
+public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+
+    private final EdgeSerialization edgeSerialization;
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+    private final EdgeManagerFactory edgeManagerFactory;
+    private final Keyspace keyspace;
+
+
+    @Inject
+    public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
+                               final EdgeMetadataSerialization edgeMetadataSerialization,
+                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
+                                   final AsyncProcessor edgeDelete ) {
+        this.edgeSerialization = edgeSerialization;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeManagerFactory = edgeManagerFactory;
+        this.keyspace = keyspace;
+
+        edgeDelete.addListener( this );
+    }
+
+
+    @Override
+    public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> delete ) {
+
+        final Edge edge = delete.getData();
+        final OrganizationScope scope = delete.getOrganizationScope();
+        final UUID maxVersion = edge.getVersion();
+        final EdgeManager edgeManager = edgeManagerFactory.createEdgeManager( scope );
+
+
+        return Observable.from( edge ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
+            @Override
+            public Observable<MutationBatch> call( final Edge edge ) {
+
+                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+                //go through every version of this edge <= the current version and remove it
+                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>() {
+                    @Override
+                    protected Iterator<MarkedEdge> getIterator() {
+                        return edgeSerialization.getEdgeToTarget( scope,
+                                new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+                                        edge.getVersion(), null ) );
+                    }
+                } ).doOnEach( new Action1<MarkedEdge>() {
+                    @Override
+                    public void call( final MarkedEdge markedEdge ) {
+                        final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
+                        batch.mergeShallow( delete );
+                    }
+                } );
+
+
+                //search by edge type and target type.  If any other edges with this target type exist,
+                // we can't delete it
+                Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
+                        new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
+                                edge.getTargetNode().getType(), null ) ).take( 2 ).count()
+                                                              .doOnEach( new Action1<Integer>() {
+                                                                  @Override
+                                                                  public void call( final Integer count ) {
+                                                                      //There's nothing to do,
+                                                                      // we have 2 different edges with the
+                                                                      // same edge type and
+                                                                      // target type.  Don't delete meta data
+                                                                      if ( count == 1 ) {
+                                                                          final MutationBatch delete =
+                                                                                  edgeMetadataSerialization
+                                                                                          .removeEdgeTypeFromSource(
+                                                                                                  scope, edge );
+                                                                          batch.mergeShallow( delete );
+                                                                      }
+                                                                  }
+                                                              } );
+
+
+                Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
+                        new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
+                                edge.getSourceNode().getType(), null ) ).take( 2 ).count()
+                                                              .doOnEach( new Action1<Integer>() {
+                                                                  @Override
+                                                                  public void call( final Integer count ) {
+                                                                      //There's nothing to do,
+                                                                      // we have 2 different edges with the
+                                                                      // same edge type and
+                                                                      // target type.  Don't delete meta data
+                                                                      if ( count == 1 ) {
+                                                                          final MutationBatch delete =
+                                                                                  edgeMetadataSerialization
+                                                                                          .removeEdgeTypeToTarget(
+                                                                                                  scope, edge );
+                                                                          batch.mergeShallow( delete );
+                                                                      }
+                                                                  }
+                                                              } );
+
+
+                //search by edge type and target type.  If any other edges with this target type exist,
+                // we can't delete it
+                Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
+                        new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+                                                            .count().doOnEach( new Action1<Integer>() {
+                            @Override
+                            public void call( final Integer count ) {
+                                //There's nothing to do,
+                                // we have 2 different edges with the
+                                // same edge type and
+                                // target type.  Don't delete meta data
+                                if ( count == 1 ) {
+                                    final MutationBatch delete =
+                                            edgeMetadataSerialization.removeEdgeTypeFromSource( scope, edge );
+                                }
+                            }
+                        } );
+
+
+                Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
+                        new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+                                                            .count().doOnEach( new Action1<Integer>() {
+                            @Override
+                            public void call( final Integer count ) {
+                                //There's nothing to do,
+                                // we have 2 different edges with the
+                                // same edge type and
+                                // target type.  Don't delete meta data
+                                if ( count == 1 ) {
+                                    final MutationBatch delete =
+                                            edgeMetadataSerialization.removeEdgeTypeToTarget( scope, edge );
+                                }
+                            }
+                        } );
+
+
+                //no op, just wait for each observable to populate the mutation before returning it
+                return Observable.zip( edges, sourceIdType, targetIdType, sourceType, targetType,
+                        new Func5<MarkedEdge, Integer, Integer, Integer, Integer, MutationBatch>() {
+                            @Override
+                            public MutationBatch call( final MarkedEdge markedEdge, final Integer integer,
+                                                       final Integer integer2, final Integer integer3,
+                                                       final Integer integer4 ) {
+                                return batch;
+                            }
+                        } );
+            }
+        }
+
+
+                                              ).map( new Func1<MutationBatch, EdgeEvent<Edge>>() {
+            @Override
+            public EdgeEvent<Edge> call( final MutationBatch mutationBatch ) {
+                try {
+                    mutationBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
+
+                return delete;
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
new file mode 100644
index 0000000..d4b6f91
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -0,0 +1,31 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
+
+/**
+ * Get the edge event in the organizational scope
+ *
+ */
+public class EdgeEvent<T> {
+
+    private final OrganizationScope organizationScope;
+    private final T data;
+
+
+    public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+        this.organizationScope = organizationScope;
+        this.data = data;
+    }
+
+
+    public OrganizationScope getOrganizationScope() {
+        return organizationScope;
+    }
+
+
+    public T getData() {
+        return data;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 1cc2fce..663215a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
-import org.apache.usergrid.persistence.graph.consistency.MessageListener;
 import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
 import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
 import org.apache.usergrid.persistence.graph.guice.NodeDelete;
@@ -50,7 +49,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
@@ -58,9 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.Scheduler;
-import rx.util.functions.Action1;
 import rx.util.functions.Func1;
-import rx.util.functions.Func4;
 
 
 /**
@@ -109,16 +105,10 @@ public class EdgeManagerImpl implements EdgeManager {
         this.edgeWriteAsyncProcessor = edgeWrite;
 
 
-        this.edgeWriteAsyncProcessor.addListener( new EdgeWriteListener() );
-
-
         this.edgeDeleteAsyncProcessor = edgeDelete;
 
-        this.edgeDeleteAsyncProcessor.addListener( new EdgeDeleteListener() );
 
         this.nodeDeleteAsyncProcessor = nodeDelete;
-
-        this.nodeDeleteAsyncProcessor.addListener( new NodeDeleteListener() );
     }
 
 
@@ -415,255 +405,4 @@ public class EdgeManagerImpl implements EdgeManager {
             return true;
         }
     }
-
-
-    /**
-     * Construct the asynchronous edge lister for the repair operation.
-     */
-    public class EdgeWriteListener implements MessageListener<Edge, Edge> {
-
-        @Override
-        public Observable<Edge> receive( final Edge write ) {
-
-            final UUID maxVersion = write.getVersion();
-
-            return Observable.create( new ObservableIterator<MarkedEdge>() {
-                @Override
-                protected Iterator<MarkedEdge> getIterator() {
-
-                    final SimpleSearchByEdge search =
-                            new SimpleSearchByEdge( write.getSourceNode(), write.getType(), write.getTargetNode(),
-                                    maxVersion, null );
-
-                    return edgeSerialization.getEdgeFromSource( scope, search );
-                }
-            } ).filter( new Func1<MarkedEdge, Boolean>() {
-
-                //TODO, reuse this for delete operation
-
-
-                /**
-                 * We only want to return edges < this version so we remove them
-                 * @param markedEdge
-                 * @return
-                 */
-                @Override
-                public Boolean call( final MarkedEdge markedEdge ) {
-                    return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
-                }
-                //buffer the deletes and issue them in a single mutation
-            } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Edge>() {
-                @Override
-                public Edge call( final List<MarkedEdge> markedEdges ) {
-
-                    final int size = markedEdges.size();
-
-                    final MutationBatch batch = edgeSerialization.deleteEdge( scope, markedEdges.get( 0 ) );
-
-                    for ( int i = 1; i < size; i++ ) {
-                        final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdges.get( i ) );
-
-                        batch.mergeShallow( delete );
-                    }
-
-                    try {
-                        batch.execute();
-                    }
-                    catch ( ConnectionException e ) {
-                        throw new RuntimeException( "Unable to issue write to cassandra", e );
-                    }
-
-                    return write;
-                }
-            } );
-        }
-    }
-
-
-    /**
-     * Construct the asynchronous delete operation from the listener
-     */
-    public class EdgeDeleteListener implements MessageListener<Edge, Edge> {
-
-        @Override
-        public Observable<Edge> receive( final Edge delete ) {
-
-            final UUID maxVersion = delete.getVersion();
-
-            return Observable.from( delete ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
-                @Override
-                public Observable<MutationBatch> call( final Edge edge ) {
-
-                    //search by edge type and target type.  If any other edges with this target type exist,
-                    // we can't delete it
-                    Observable<MutationBatch> sourceIdType = loadEdgesFromSourceByType(
-                            new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
-                                    edge.getTargetNode().getType(), null ) ).take( 2 ).count()
-                            .map( new Func1<Integer, MutationBatch>() {
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-                                    return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
-                                }
-                            } );
-
-
-                    Observable<MutationBatch> targetIdType = loadEdgesToTargetByType(
-                            new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
-                                    edge.getSourceNode().getType(), null ) ).take( 2 ).count()
-                            .map( new Func1<Integer, MutationBatch>() {
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-
-                                    return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
-                                }
-                            } );
-
-                    //search by edge type and target type.  If any other edges with this target type exist,
-                    // we can't delete it
-                    Observable<MutationBatch> sourceType = loadEdgesFromSource(
-                            new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) )
-                            .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-
-
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-
-                                    return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
-                                }
-                            } );
-
-
-                    Observable<MutationBatch> targetType = loadEdgesToTarget(
-                            new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) )
-                            .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
-
-
-                                @Override
-                                public MutationBatch call( final Integer count ) {
-
-
-                                    //There's nothing to do, we have 2 different edges with the same edge type and
-                                    // target type.  Don't delete meta data
-                                    if ( count == 2 ) {
-                                        return null;
-                                    }
-
-                                    return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
-                                }
-                            } );
-
-
-                    return Observable.zip( sourceIdType, targetIdType, sourceType, targetType,
-                            new Func4<MutationBatch, MutationBatch, MutationBatch, MutationBatch, MutationBatch>() {
-
-
-                                @Override
-                                public MutationBatch call( final MutationBatch mutationBatch,
-                                                           final MutationBatch mutationBatch2,
-                                                           final MutationBatch mutationBatch3,
-                                                           final MutationBatch mutationBatch4 ) {
-
-                                    return join( join( join( mutationBatch, mutationBatch2 ), mutationBatch3 ),
-                                            mutationBatch4 );
-                                }
-
-
-                                private MutationBatch join( MutationBatch first, MutationBatch second ) {
-                                    if ( first == null ) {
-                                        if ( second == null ) {
-                                            return null;
-                                        }
-
-                                        return second;
-                                    }
-
-
-                                    else if ( second == null ) {
-                                        return first;
-                                    }
-
-                                    first.mergeShallow( second );
-
-                                    return first;
-                                }
-                            } );
-                }
-            } ).map( new Func1<MutationBatch, Edge>() {
-                @Override
-                public Edge call( final MutationBatch mutationBatch ) {
-                    try {
-                        mutationBatch.execute();
-                    }
-                    catch ( ConnectionException e ) {
-                        throw new RuntimeException( "Unable to execute mutation", e );
-                    }
-
-                    return delete;
-                }
-            } );
-        }
-    }
-
-
-    /**
-     * Construct the asynchronous node delete from the q
-     */
-    public class NodeDeleteListener implements MessageListener<Id, Id> {
-
-        @Override
-        public Observable<Id> receive( final Id node ) {
-
-
-            return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
-                @Override
-                public Optional<UUID> call( final Id id ) {
-                    return nodeSerialization.getMaxVersion( scope, node );
-                }
-            } ).flatMap( new Func1<Optional<UUID>, Observable<Edge>>() {
-                @Override
-                public Observable<Edge> call( final Optional<UUID> uuidOptional ) {
-                    return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
-                            .flatMap( new Func1<String, Observable<Edge>>() {
-                                @Override
-                                public Observable<Edge> call( final String edgeType ) {
-
-                                    //for each edge type, we want to search all edges < this version to the node and
-                                    // delete them. We might want to batch this up for efficiency
-                                    return loadEdgesToTarget(
-                                            new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
-                                            .doOnEach( new Action1<Edge>() {
-                                                @Override
-                                                public void call( final Edge edge ) {
-                                                    deleteEdge( edge );
-                                                }
-                                            } );
-                                }
-                            } );
-                }
-            } ).map( new Func1<Edge, Id>() {
-                @Override
-                public Id call( final Edge edge ) {
-                    return node;
-                }
-            } );
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
new file mode 100644
index 0000000..428085e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -0,0 +1,103 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous edge lister for the repair operation.
+ */
+@Singleton
+public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+    private final EdgeSerialization edgeSerialization;
+    private final GraphFig graphFig;
+    private final Keyspace keyspace;
+
+
+    public EdgeWriteListener( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+                              final Keyspace keyspace, @EdgeWrite final AsyncProcessor edgeWrite ) {
+        this.edgeSerialization = edgeSerialization;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
+        edgeWrite.addListener( this );
+    }
+
+
+    @Override
+    public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> write ) {
+
+        final Edge edge = write.getData();
+        final OrganizationScope scope = write.getOrganizationScope();
+        final UUID maxVersion = edge.getVersion();
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+
+                final SimpleSearchByEdge search =
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
+                                null );
+
+                return edgeSerialization.getEdgeFromSource( scope, search );
+            }
+        } ).filter( new Func1<MarkedEdge, Boolean>() {
+
+            //TODO, reuse this for delete operation
+
+
+            /**
+             * We only want to return edges < this version so we remove them
+             * @param markedEdge
+             * @return
+             */
+            @Override
+            public Boolean call( final MarkedEdge markedEdge ) {
+                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+            }
+            //buffer the deletes and issue them in a single mutation
+        } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
+            @Override
+            public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
+
+                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                for ( MarkedEdge edge : markedEdges ) {
+                    final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+                    batch.mergeShallow( delete );
+                }
+
+                try {
+                    batch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to issue write to cassandra", e );
+                }
+
+                return write;
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
new file mode 100644
index 0000000..08cc942
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.NodeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous node delete from the q
+ */
+public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
+
+    private final NodeSerialization nodeSerialization;
+    private final EdgeSerialization edgeSerialization;
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+
+
+    /**
+     * Wire the serialization dependencies
+     */
+    @Inject
+    public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
+                               final EdgeMetadataSerialization edgeMetadataSerialization, @NodeDelete final AsyncProcessor nodeDelete) {
+
+
+        this.nodeSerialization = nodeSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        nodeDelete.addListener( this );
+    }
+
+
+    @Override
+    public Observable<EdgeEvent<Id>> receive( final EdgeEvent<Id> edgeEvent ) {
+
+        final Id node = edgeEvent.getData();
+        final OrganizationScope scope = edgeEvent.getOrganizationScope();
+
+
+        return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+            @Override
+            public Optional<UUID> call( final Id id ) {
+                return nodeSerialization.getMaxVersion( scope, node );
+            }
+        } ).flatMap( new Func1<Optional<UUID>, Observable<MarkedEdge>>() {
+            @Override
+            public Observable<MarkedEdge> call( final Optional<UUID> uuidOptional ) {
+
+                return getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
+                            @Override
+                            public Observable<MarkedEdge> call( final String edgeType ) {
+
+                                //for each edge type, we want to search all edges < this version to the node and
+                                // delete them. We might want to batch this up for efficiency
+                                return loadEdgesToTarget( scope,
+                                        new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+                                        .doOnEach( new Action1<MarkedEdge>() {
+                                            @Override
+                                            public void call( final MarkedEdge markedEdge ) {
+                                                edgeSerialization.deleteEdge( scope, markedEdge );
+                                            }
+                                        } );
+                            }
+                        } );
+            }
+        } ).map( new Func1<MarkedEdge, EdgeEvent<Id>>() {
+            @Override
+            public EdgeEvent<Id> call( final MarkedEdge edge ) {
+                return edgeEvent;
+            }
+        } );
+    }
+
+
+    /**
+     * Get all existing edge types to the target node
+     * @param scope
+     * @param search
+     * @return
+     */
+    private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+            }
+        } );
+    }
+
+
+    /**
+     * Load all edges pointing to this target
+     * @param scope
+     * @param search
+     * @return
+     */
+    private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTarget( scope, search );
+            }
+        } );
+    }
+}