You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/12 21:22:25 UTC

[2/4] git commit: Checkpoint before refactor

Checkpoint before refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e8568ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e8568ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e8568ba4

Branch: refs/heads/asyncqueue
Commit: e8568ba48cb45251eab41723d53ac57a5844bc3f
Parents: 0a256bb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 11 10:42:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 11 10:42:35 2014 -0700

----------------------------------------------------------------------
 .../graph/impl/EdgeDeleteListener.java          |   4 +-
 .../persistence/graph/impl/EdgeEvent.java       |  13 +-
 .../graph/impl/NodeDeleteListener.java          | 175 +++++++++++++++----
 .../EdgeMetadataSerialization.java              |  76 +++++++-
 .../impl/EdgeMetadataSerializationImpl.java     |  44 ++++-
 .../graph/impl/NodeDeleteListenerTest.java      |   8 +-
 6 files changed, 261 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 85e4b4f..e0ceafc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -44,8 +44,8 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
     @Inject
     public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
                                final EdgeMetadataSerialization edgeMetadataSerialization,
-                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
-                                   final AsyncProcessor edgeDelete ) {
+                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace,
+                               @EdgeDelete final AsyncProcessor edgeDelete ) {
         this.edgeSerialization = edgeSerialization;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeManagerFactory = edgeManagerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
index d4b6f91..b2ba686 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -1,6 +1,8 @@
 package org.apache.usergrid.persistence.graph.impl;
 
 
+import java.util.UUID;
+
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 
 
@@ -12,11 +14,13 @@ public class EdgeEvent<T> {
 
     private final OrganizationScope organizationScope;
     private final T data;
+    private final UUID version;
 
 
-    public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+    public EdgeEvent( final OrganizationScope organizationScope, final UUID version, final T data ) {
         this.organizationScope = organizationScope;
         this.data = data;
+        this.version = version;
     }
 
 
@@ -25,7 +29,14 @@ public class EdgeEvent<T> {
     }
 
 
+    public UUID getVersion() {
+        return version;
+    }
+
+
     public T getData() {
         return data;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index fbd5d08..1d2124a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -6,6 +6,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -26,6 +27,8 @@ import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 
@@ -35,11 +38,15 @@ import rx.functions.Func1;
  */
 public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
 
+
     private final NodeSerialization nodeSerialization;
-    private final EdgeSerialization edgeSerialization;
-    private final EdgeMetadataSerialization edgeMetadataSerialization;
+//    private final EdgeSerialization edgeSerialization;
+//    private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final GraphFig graphFig;
-    private final Keyspace keyspace;
+//    private final Keyspace keyspace;
+
+    private final Scheduler scheduler;
+    private final EdgeManagerFactory edgeManagerFactory;
 
 
     /**
@@ -48,14 +55,19 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
     @Inject
     public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
                                final EdgeMetadataSerialization edgeMetadataSerialization, final GraphFig graphFig,
-                               final Keyspace keyspace, @NodeDelete final AsyncProcessor nodeDelete ) {
+                               final Keyspace keyspace, final Scheduler scheduler,
+                               final EdgeManagerFactory edgeManagerFactory,
+                               @NodeDelete final AsyncProcessor nodeDelete ) {
 
 
         this.nodeSerialization = nodeSerialization;
-        this.edgeSerialization = edgeSerialization;
-        this.edgeMetadataSerialization = edgeMetadataSerialization;
+//        this.edgeSerialization = edgeSerialization;
+//        this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.graphFig = graphFig;
-        this.keyspace = keyspace;
+//        this.keyspace = keyspace;
+        this.scheduler = scheduler;
+        this.edgeManagerFactory = edgeManagerFactory;
+
         nodeDelete.addListener( this );
     }
 
@@ -65,9 +77,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
 
         final Id node = edgeEvent.getData();
         final OrganizationScope scope = edgeEvent.getOrganizationScope();
+        final UUID version = edgeEvent.getVersion();
 
 
-        return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+        return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Optional<UUID>>() {
             @Override
             public Optional<UUID> call( final Id id ) {
                 return nodeSerialization.getMaxVersion( scope, node );
@@ -80,60 +93,148 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
                     }
                 } )
 
-                //delete source and targets in parallel and merge them into a single observable
+                        //delete source and targets in parallel and merge them into a single observable
                 .flatMap( new Func1<Optional<UUID>, Observable<List<MarkedEdge>>>() {
                     @Override
                     public Observable<List<MarkedEdge>> call( final Optional<UUID> uuidOptional ) {
 
                         //get all edges pointing to the target node and buffer then into groups for deletion
-                        Observable<List<MarkedEdge>> targetEdges =
+                        Observable<MarkedEdge> targetEdges =
                                 getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
-                                        .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+                                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
                                             @Override
-                                            public Observable<List<MarkedEdge>> call( final String edgeType ) {
+                                            public Observable<MarkedEdge> call( final String edgeType ) {
                                                 return loadEdgesToTarget( scope,
                                                         new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
-                                                                null ) ).buffer( graphFig.getScanPageSize() );
+                                                                null ) );
                                             }
                                         } );
 
 
                         //get all edges pointing to the source node and buffer them into groups for deletion
-                        Observable<List<MarkedEdge>> sourceEdges =
+                        Observable<MarkedEdge> sourceEdges =
                                 getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
-                                        .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+                                        .flatMap( new Func1<String, Observable<MarkedEdge>>() {
                                             @Override
-                                            public Observable<List<MarkedEdge>> call( final String edgeType ) {
+                                            public Observable<MarkedEdge> call( final String edgeType ) {
                                                 return loadEdgesFromSource( scope,
                                                         new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
-                                                                null ) ).buffer( graphFig.getScanPageSize() );
+                                                                null ) );
                                             }
                                         } );
 
 
-                        return Observable.merge( targetEdges, sourceEdges );
+                        //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
+                        return Observable.merge( targetEdges, sourceEdges ).buffer( graphFig.getScanPageSize() )
+                                         .doOnNext( new Action1<List<MarkedEdge>>() {
+                                             @Override
+                                             public void call( final List<MarkedEdge> markedEdges ) {
+                                                 MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+                                                 for ( MarkedEdge marked : markedEdges ) {
+                                                     batch.mergeShallow(
+                                                             edgeSerialization.deleteEdge( scope, marked ) );
+                                                 }
+
+                                                 try {
+                                                     batch.execute();
+                                                 }
+                                                 catch ( ConnectionException e ) {
+                                                     throw new RuntimeException( "Unable to execute mutation" );
+                                                 }
+                                             }
+                                         } );
                     }
-                } ).doOnNext( new Action1<List<MarkedEdge>>() {
+                } ).last().flatMap( new Func1<List<MarkedEdge>, Observable<String>>() {
                     @Override
-                    public void call( final List<MarkedEdge> markedEdges ) {
-                        MutationBatch batch = keyspace.prepareMutationBatch();
+                    public Observable<String> call( final List<MarkedEdge> markedEdges ) {
+
+                        //delete all meta for edges to this node
+                        Observable<String> targets =
+                                getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+                                        .doOnNext( new Action1<String>() {
+                                            @Override
+                                            public void call( final String type ) {
+
+                                                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                final Iterator<String> types = edgeMetadataSerialization
+                                                        .getIdTypesToTarget( scope,
+                                                                new SimpleSearchIdType( node, type, null ) );
 
+                                                while ( types.hasNext() ) {
+                                                    batch.mergeShallow( edgeMetadataSerialization
+                                                            .removeIdTypeToTarget( scope, node, type, types.next(),
+                                                                    version ) );
+                                                }
 
-                        for ( MarkedEdge marked : markedEdges ) {
-                            final MutationBatch edgeBatch = edgeSerialization.deleteEdge( scope, marked );
-                            batch.mergeShallow( edgeBatch );
-                        }
+                                                batch.mergeShallow( edgeMetadataSerialization
+                                                        .removeEdgeTypeToTarget( scope, node, type, version ) );
+
+                                                try {
+                                                    batch.execute();
+                                                }
+                                                catch ( ConnectionException e ) {
+                                                    throw new RuntimeException( "Unable to execute cassandra call" );
+                                                }
+                                            }
+                                        } );
 
-                        try {
-                            batch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( "Unable to execute mutation" );
-                        }
+                        //delete all meta for edges from this node
+                        Observable<String> sources =
+                                getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+                                        .doOnNext( new Action1<String>() {
+                                            @Override
+                                            public void call( final String type ) {
+
+                                                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                final Iterator<String> types = edgeMetadataSerialization
+                                                        .getIdTypesFromSource( scope,
+                                                                new SimpleSearchIdType( node, type, null ) );
+
+                                                while ( types.hasNext() ) {
+                                                    batch.mergeShallow( edgeMetadataSerialization
+                                                            .removeIdTypeFromSource( scope, node, type, types.next(),
+                                                                    version ) );
+                                                }
+
+                                                batch.mergeShallow( edgeMetadataSerialization
+                                                        .removeEdgeTypeFromSource( scope, node, type, version ) );
+
+                                                try {
+                                                    batch.execute();
+                                                }
+                                                catch ( ConnectionException e ) {
+                                                    throw new RuntimeException( "Unable to execute cassandra call" );
+                                                }
+                                            }
+                                        } );
+
+
+                        //no op, just zip them up  so we can do our action on our last
+                        return Observable.merge( targets, sources )
+                                //when we're done emitting, merge them
+                                .doOnCompleted( new Action0() {
+                                    @Override
+                                    public void call() {
+                                        try {
+                                            nodeSerialization.delete( scope, node, edgeEvent.getVersion() ).execute();
+                                        }
+                                        catch ( ConnectionException e ) {
+                                            throw new RuntimeException( "Unable to execute mutation" );
+                                        }
+                                    }
+                                } );
                     }
-                } ).map( new Func1<List<MarkedEdge>, EdgeEvent<Id>>() {
+                } )
+                        //when we're completed, we need to delete our id from types, then remove our mark
+
+                        //return our event to the caller
+                .map( new Func1<String, EdgeEvent<Id>>() {
                     @Override
-                    public EdgeEvent<Id> call( final List<MarkedEdge> markedEdges ) {
+                    public EdgeEvent<Id> call( final String mark ) {
                         return edgeEvent;
                     }
                 } );
@@ -150,7 +251,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 
@@ -164,7 +265,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 
@@ -178,7 +279,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesToTarget( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 
@@ -192,7 +293,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
             protected Iterator<MarkedEdge> getIterator() {
                 return edgeSerialization.getEdgesFromSource( scope, search );
             }
-        } );
+        } ).subscribeOn( scheduler );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index 8806b92..5fa6c7c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -21,11 +21,13 @@ package org.apache.usergrid.persistence.graph.serialization;
 
 
 import java.util.Iterator;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
 import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.netflix.astyanax.MutationBatch;
 
@@ -42,8 +44,8 @@ public interface EdgeMetadataSerialization {
     MutationBatch writeEdge( OrganizationScope scope, Edge edge );
 
     /**
-     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last
-     * edge with this type at version <= edge version
+     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -54,8 +56,20 @@ public interface EdgeMetadataSerialization {
 
 
     /**
-     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last
-          * edge with this type at version <= edge version
+     * Remove the edge type from the source with the specified version
+     *
+     * @param scope Organization scope
+     * @param sourceNode Source node
+     * @param type The edge type
+     * @param version The version to use on the delete
+     *
+     * @return A mutation batch to use on issuing the delelete
+     */
+    MutationBatch removeEdgeTypeFromSource( OrganizationScope scope, Id sourceNode, String type, UUID version );
+
+    /**
+     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -64,9 +78,25 @@ public interface EdgeMetadataSerialization {
      */
     MutationBatch removeIdTypeFromSource( OrganizationScope scope, Edge edge );
 
+
     /**
-     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last
-          * edge with this type at version <= edge version
+     * Remove all meta data from the source to the target type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
+     *
+     * @param scope Organization scope
+     * @param sourceNode Source node
+     * @param type The edge type
+     * @param idType The idType to use
+     * @param version The version to use on the delete
+     *
+     * @return a mutation batch with the delete operations
+     */
+    MutationBatch removeIdTypeFromSource( OrganizationScope scope, Id sourceNode, String type, String idType,
+                                          UUID version );
+
+    /**
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -77,8 +107,22 @@ public interface EdgeMetadataSerialization {
 
 
     /**
-     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last
-          * edge with this type at version <= edge version
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
+     *
+     * @param scope Organization scope
+     * @param targetNode Source node
+     * @param type The edge type
+     * @param version The version to use on the delete
+     *
+     * @return A mutation batch to use on issuing the delelete
+     */
+    MutationBatch removeEdgeTypeToTarget( OrganizationScope scope, Id targetNode, String type, UUID version );
+
+
+    /**
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
      *
      * @param scope The org scope
      * @param edge The edge to remove
@@ -87,6 +131,22 @@ public interface EdgeMetadataSerialization {
      */
     MutationBatch removeIdTypeToTarget( OrganizationScope scope, Edge edge );
 
+
+    /**
+     * Remove all meta data from the target to the source type.  The caller must ensure that this is the last edge with
+     * this type at version <= edge version
+     *
+     * @param scope Organization scope
+     * @param targetNode Source node
+     * @param type The edge type
+     * @param idType The idType to use
+     * @param version The version to use on the delete
+     *
+     * @return a mutation batch with the delete operations
+     */
+    MutationBatch removeIdTypeToTarget( OrganizationScope scope, Id targetNode, String type, String idType,
+                                        UUID version );
+
     /**
      * Get all edge types from the given source node
      *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index b6028ea..49a8be6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -175,27 +175,55 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
 
     @Override
     public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Edge edge ) {
-        return removeEdgeType( scope, edge.getSourceNode(), edge.getType(), edge.getVersion(), CF_SOURCE_EDGE_TYPES );
+        return removeEdgeTypeFromSource( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() );
+    }
+
+
+    @Override
+    public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Id sourceNode,
+                                                   final String type, final UUID version ) {
+        return removeEdgeType( scope, sourceNode, type, version, CF_SOURCE_EDGE_TYPES );
     }
 
 
     @Override
     public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Edge edge ) {
-        return removeIdType( scope, edge.getSourceNode(), edge.getTargetNode(), edge.getType(), edge.getVersion(),
-                CF_SOURCE_EDGE_ID_TYPES );
+        return removeIdTypeFromSource( scope, edge.getSourceNode(), edge.getType(),  edge.getTargetNode().getType(),
+                edge.getVersion() );
+    }
+
+
+    @Override
+    public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Id sourceNode, final String type,
+                                                 final String idType, final UUID version ) {
+        return removeIdType( scope, sourceNode, idType, type, version,
+                        CF_SOURCE_EDGE_ID_TYPES );
     }
 
 
     @Override
     public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Edge edge ) {
-        return removeEdgeType( scope, edge.getTargetNode(), edge.getType(), edge.getVersion(), CF_TARGET_EDGE_TYPES );
+        return removeEdgeTypeToTarget( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() );
     }
 
 
+    @Override
+    public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+                                                 final UUID version ) {
+        return removeEdgeType( scope, targetNode,type, version, CF_TARGET_EDGE_TYPES );
+    }
+
 
     @Override
     public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Edge edge ) {
-        return removeIdType( scope, edge.getTargetNode(), edge.getSourceNode(), edge.getType(), edge.getVersion(),
+        return removeIdTypeToTarget(scope, edge.getTargetNode(), edge.getType(),  edge.getSourceNode().getType(), edge.getVersion());
+    }
+
+
+    @Override
+    public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+                                               final String idType, final UUID version ) {
+        return removeIdType( scope, targetNode, idType,type, version,
                 CF_TARGET_EDGE_ID_TYPES );
     }
 
@@ -234,14 +262,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
      *
      * @param scope The scope to use
      * @param rowId The id to use in the row key
-     * @param colId The id type to use in the column
+     * @param idType The id type to use in the column
      * @param edgeType The edge type to use in the column
      * @param version The version to use on the column
      * @param cf The column family to use
      *
      * @return A populated mutation with the remove operations
      */
-    private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final Id colId,
+    private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final String idType,
                                         final String edgeType, final UUID version,
                                         final MultiTennantColumnFamily<OrganizationScope, EdgeIdTypeKey, String> cf ) {
 
@@ -256,7 +284,7 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
                 new ScopedRowKey<OrganizationScope, EdgeIdTypeKey>( scope, new EdgeIdTypeKey( rowId, edgeType ) );
 
 
-        batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( colId.getType() );
+        batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( idType );
 
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 2d55698..5b4899a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -109,15 +109,17 @@ public class NodeDeleteListenerTest {
 
         Id targetNode = edge.getTargetNode();
 
+        UUID version = UUIDGenerator.newTimeUUID();
 
-        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, version, sourceNode );
 
         EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
 
         assertNull( "Mark was not set, no delete should be executed", event );
 
 
-        deleteEvent = new EdgeEvent<Id>( scope, targetNode );
+        deleteEvent = new EdgeEvent<Id>( scope, version, targetNode );
 
         event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
 
@@ -152,7 +154,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, sourceNode, deleteVersion ).execute();
 
-        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+        EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, deleteVersion, sourceNode );
 
 
         EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().last();