You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/12 21:22:25 UTC
[2/4] git commit: Checkpoint before refactor
Checkpoint before refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e8568ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e8568ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e8568ba4
Branch: refs/heads/asyncqueue
Commit: e8568ba48cb45251eab41723d53ac57a5844bc3f
Parents: 0a256bb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 11 10:42:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 11 10:42:35 2014 -0700
----------------------------------------------------------------------
.../graph/impl/EdgeDeleteListener.java | 4 +-
.../persistence/graph/impl/EdgeEvent.java | 13 +-
.../graph/impl/NodeDeleteListener.java | 175 +++++++++++++++----
.../EdgeMetadataSerialization.java | 76 +++++++-
.../impl/EdgeMetadataSerializationImpl.java | 44 ++++-
.../graph/impl/NodeDeleteListenerTest.java | 8 +-
6 files changed, 261 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 85e4b4f..e0ceafc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -44,8 +44,8 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
@Inject
public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
- final AsyncProcessor edgeDelete ) {
+ final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace,
+ @EdgeDelete final AsyncProcessor edgeDelete ) {
this.edgeSerialization = edgeSerialization;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeManagerFactory = edgeManagerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
index d4b6f91..b2ba686 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -1,6 +1,8 @@
package org.apache.usergrid.persistence.graph.impl;
+import java.util.UUID;
+
import org.apache.usergrid.persistence.collection.OrganizationScope;
@@ -12,11 +14,13 @@ public class EdgeEvent<T> {
private final OrganizationScope organizationScope;
private final T data;
+ private final UUID version;
- public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+ public EdgeEvent( final OrganizationScope organizationScope, final UUID version, final T data ) {
this.organizationScope = organizationScope;
this.data = data;
+ this.version = version;
}
@@ -25,7 +29,14 @@ public class EdgeEvent<T> {
}
+ public UUID getVersion() {
+ return version;
+ }
+
+
public T getData() {
return data;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index fbd5d08..1d2124a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -6,6 +6,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -26,6 +27,8 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
@@ -35,11 +38,15 @@ import rx.functions.Func1;
*/
public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
+
private final NodeSerialization nodeSerialization;
- private final EdgeSerialization edgeSerialization;
- private final EdgeMetadataSerialization edgeMetadataSerialization;
+// private final EdgeSerialization edgeSerialization;
+// private final EdgeMetadataSerialization edgeMetadataSerialization;
private final GraphFig graphFig;
- private final Keyspace keyspace;
+// private final Keyspace keyspace;
+
+ private final Scheduler scheduler;
+ private final EdgeManagerFactory edgeManagerFactory;
/**
@@ -48,14 +55,19 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
@Inject
public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization, final GraphFig graphFig,
- final Keyspace keyspace, @NodeDelete final AsyncProcessor nodeDelete ) {
+ final Keyspace keyspace, final Scheduler scheduler,
+ final EdgeManagerFactory edgeManagerFactory,
+ @NodeDelete final AsyncProcessor nodeDelete ) {
this.nodeSerialization = nodeSerialization;
- this.edgeSerialization = edgeSerialization;
- this.edgeMetadataSerialization = edgeMetadataSerialization;
+// this.edgeSerialization = edgeSerialization;
+// this.edgeMetadataSerialization = edgeMetadataSerialization;
this.graphFig = graphFig;
- this.keyspace = keyspace;
+// this.keyspace = keyspace;
+ this.scheduler = scheduler;
+ this.edgeManagerFactory = edgeManagerFactory;
+
nodeDelete.addListener( this );
}
@@ -65,9 +77,10 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
final Id node = edgeEvent.getData();
final OrganizationScope scope = edgeEvent.getOrganizationScope();
+ final UUID version = edgeEvent.getVersion();
- return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+ return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Optional<UUID>>() {
@Override
public Optional<UUID> call( final Id id ) {
return nodeSerialization.getMaxVersion( scope, node );
@@ -80,60 +93,148 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
}
} )
- //delete source and targets in parallel and merge them into a single observable
+ //delete source and targets in parallel and merge them into a single observable
.flatMap( new Func1<Optional<UUID>, Observable<List<MarkedEdge>>>() {
@Override
public Observable<List<MarkedEdge>> call( final Optional<UUID> uuidOptional ) {
//get all edges pointing to the target node and buffer then into groups for deletion
- Observable<List<MarkedEdge>> targetEdges =
+ Observable<MarkedEdge> targetEdges =
getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+ .flatMap( new Func1<String, Observable<MarkedEdge>>() {
@Override
- public Observable<List<MarkedEdge>> call( final String edgeType ) {
+ public Observable<MarkedEdge> call( final String edgeType ) {
return loadEdgesToTarget( scope,
new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
- null ) ).buffer( graphFig.getScanPageSize() );
+ null ) );
}
} );
//get all edges pointing to the source node and buffer them into groups for deletion
- Observable<List<MarkedEdge>> sourceEdges =
+ Observable<MarkedEdge> sourceEdges =
getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<List<MarkedEdge>>>() {
+ .flatMap( new Func1<String, Observable<MarkedEdge>>() {
@Override
- public Observable<List<MarkedEdge>> call( final String edgeType ) {
+ public Observable<MarkedEdge> call( final String edgeType ) {
return loadEdgesFromSource( scope,
new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(),
- null ) ).buffer( graphFig.getScanPageSize() );
+ null ) );
}
} );
- return Observable.merge( targetEdges, sourceEdges );
+ //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
+ return Observable.merge( targetEdges, sourceEdges ).buffer( graphFig.getScanPageSize() )
+ .doOnNext( new Action1<List<MarkedEdge>>() {
+ @Override
+ public void call( final List<MarkedEdge> markedEdges ) {
+ MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+ for ( MarkedEdge marked : markedEdges ) {
+ batch.mergeShallow(
+ edgeSerialization.deleteEdge( scope, marked ) );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+ }
+ } );
}
- } ).doOnNext( new Action1<List<MarkedEdge>>() {
+ } ).last().flatMap( new Func1<List<MarkedEdge>, Observable<String>>() {
@Override
- public void call( final List<MarkedEdge> markedEdges ) {
- MutationBatch batch = keyspace.prepareMutationBatch();
+ public Observable<String> call( final List<MarkedEdge> markedEdges ) {
+
+ //delete all meta for edges to this node
+ Observable<String> targets =
+ getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String type ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final Iterator<String> types = edgeMetadataSerialization
+ .getIdTypesToTarget( scope,
+ new SimpleSearchIdType( node, type, null ) );
+ while ( types.hasNext() ) {
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeToTarget( scope, node, type, types.next(),
+ version ) );
+ }
- for ( MarkedEdge marked : markedEdges ) {
- final MutationBatch edgeBatch = edgeSerialization.deleteEdge( scope, marked );
- batch.mergeShallow( edgeBatch );
- }
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeEdgeTypeToTarget( scope, node, type, version ) );
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute cassandra call" );
+ }
+ }
+ } );
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation" );
- }
+ //delete all meta for edges from this node
+ Observable<String> sources =
+ getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String type ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ final Iterator<String> types = edgeMetadataSerialization
+ .getIdTypesFromSource( scope,
+ new SimpleSearchIdType( node, type, null ) );
+
+ while ( types.hasNext() ) {
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeIdTypeFromSource( scope, node, type, types.next(),
+ version ) );
+ }
+
+ batch.mergeShallow( edgeMetadataSerialization
+ .removeEdgeTypeFromSource( scope, node, type, version ) );
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute cassandra call" );
+ }
+ }
+ } );
+
+
+ //no op, just zip them up so we can do our action on our last
+ return Observable.merge( targets, sources )
+ //when we're done emitting, merge them
+ .doOnCompleted( new Action0() {
+ @Override
+ public void call() {
+ try {
+ nodeSerialization.delete( scope, node, edgeEvent.getVersion() ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation" );
+ }
+ }
+ } );
}
- } ).map( new Func1<List<MarkedEdge>, EdgeEvent<Id>>() {
+ } )
+ //when we're completed, we need to delete our id from types, then remove our mark
+
+ //return our event to the caller
+ .map( new Func1<String, EdgeEvent<Id>>() {
@Override
- public EdgeEvent<Id> call( final List<MarkedEdge> markedEdges ) {
+ public EdgeEvent<Id> call( final String mark ) {
return edgeEvent;
}
} );
@@ -150,7 +251,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
@@ -164,7 +265,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
@@ -178,7 +279,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTarget( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
@@ -192,7 +293,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSource( scope, search );
}
- } );
+ } ).subscribeOn( scheduler );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index 8806b92..5fa6c7c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -21,11 +21,13 @@ package org.apache.usergrid.persistence.graph.serialization;
import java.util.Iterator;
+import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.netflix.astyanax.MutationBatch;
@@ -42,8 +44,8 @@ public interface EdgeMetadataSerialization {
MutationBatch writeEdge( OrganizationScope scope, Edge edge );
/**
- * Remove all meta data from the source to the target type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove all meta data from the source to the target type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -54,8 +56,20 @@ public interface EdgeMetadataSerialization {
/**
- * Remove all meta data from the source to the target type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove the edge type from the source with the specified version
+ *
+ * @param scope Organization scope
+ * @param sourceNode Source node
+ * @param type The edge type
+ * @param version The version to use on the delete
+ *
+ * @return A mutation batch to use on issuing the delelete
+ */
+ MutationBatch removeEdgeTypeFromSource( OrganizationScope scope, Id sourceNode, String type, UUID version );
+
+ /**
+ * Remove all meta data from the source to the target type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -64,9 +78,25 @@ public interface EdgeMetadataSerialization {
*/
MutationBatch removeIdTypeFromSource( OrganizationScope scope, Edge edge );
+
/**
- * Remove all meta data from the target to the source type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove all meta data from the source to the target type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
+ *
+ * @param scope Organization scope
+ * @param sourceNode Source node
+ * @param type The edge type
+ * @param idType The idType to use
+ * @param version The version to use on the delete
+ *
+ * @return a mutation batch with the delete operations
+ */
+ MutationBatch removeIdTypeFromSource( OrganizationScope scope, Id sourceNode, String type, String idType,
+ UUID version );
+
+ /**
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -77,8 +107,22 @@ public interface EdgeMetadataSerialization {
/**
- * Remove all meta data from the target to the source type. The caller must ensure that this is the last
- * edge with this type at version <= edge version
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
+ *
+ * @param scope Organization scope
+ * @param targetNode Source node
+ * @param type The edge type
+ * @param version The version to use on the delete
+ *
+ * @return A mutation batch to use on issuing the delelete
+ */
+ MutationBatch removeEdgeTypeToTarget( OrganizationScope scope, Id targetNode, String type, UUID version );
+
+
+ /**
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
*
* @param scope The org scope
* @param edge The edge to remove
@@ -87,6 +131,22 @@ public interface EdgeMetadataSerialization {
*/
MutationBatch removeIdTypeToTarget( OrganizationScope scope, Edge edge );
+
+ /**
+ * Remove all meta data from the target to the source type. The caller must ensure that this is the last edge with
+ * this type at version <= edge version
+ *
+ * @param scope Organization scope
+ * @param targetNode Source node
+ * @param type The edge type
+ * @param idType The idType to use
+ * @param version The version to use on the delete
+ *
+ * @return a mutation batch with the delete operations
+ */
+ MutationBatch removeIdTypeToTarget( OrganizationScope scope, Id targetNode, String type, String idType,
+ UUID version );
+
/**
* Get all edge types from the given source node
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index b6028ea..49a8be6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -175,27 +175,55 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
@Override
public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Edge edge ) {
- return removeEdgeType( scope, edge.getSourceNode(), edge.getType(), edge.getVersion(), CF_SOURCE_EDGE_TYPES );
+ return removeEdgeTypeFromSource( scope, edge.getSourceNode(), edge.getType(), edge.getVersion() );
+ }
+
+
+ @Override
+ public MutationBatch removeEdgeTypeFromSource( final OrganizationScope scope, final Id sourceNode,
+ final String type, final UUID version ) {
+ return removeEdgeType( scope, sourceNode, type, version, CF_SOURCE_EDGE_TYPES );
}
@Override
public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Edge edge ) {
- return removeIdType( scope, edge.getSourceNode(), edge.getTargetNode(), edge.getType(), edge.getVersion(),
- CF_SOURCE_EDGE_ID_TYPES );
+ return removeIdTypeFromSource( scope, edge.getSourceNode(), edge.getType(), edge.getTargetNode().getType(),
+ edge.getVersion() );
+ }
+
+
+ @Override
+ public MutationBatch removeIdTypeFromSource( final OrganizationScope scope, final Id sourceNode, final String type,
+ final String idType, final UUID version ) {
+ return removeIdType( scope, sourceNode, idType, type, version,
+ CF_SOURCE_EDGE_ID_TYPES );
}
@Override
public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Edge edge ) {
- return removeEdgeType( scope, edge.getTargetNode(), edge.getType(), edge.getVersion(), CF_TARGET_EDGE_TYPES );
+ return removeEdgeTypeToTarget( scope, edge.getTargetNode(), edge.getType(), edge.getVersion() );
}
+ @Override
+ public MutationBatch removeEdgeTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+ final UUID version ) {
+ return removeEdgeType( scope, targetNode,type, version, CF_TARGET_EDGE_TYPES );
+ }
+
@Override
public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Edge edge ) {
- return removeIdType( scope, edge.getTargetNode(), edge.getSourceNode(), edge.getType(), edge.getVersion(),
+ return removeIdTypeToTarget(scope, edge.getTargetNode(), edge.getType(), edge.getSourceNode().getType(), edge.getVersion());
+ }
+
+
+ @Override
+ public MutationBatch removeIdTypeToTarget( final OrganizationScope scope, final Id targetNode, final String type,
+ final String idType, final UUID version ) {
+ return removeIdType( scope, targetNode, idType,type, version,
CF_TARGET_EDGE_ID_TYPES );
}
@@ -234,14 +262,14 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
*
* @param scope The scope to use
* @param rowId The id to use in the row key
- * @param colId The id type to use in the column
+ * @param idType The id type to use in the column
* @param edgeType The edge type to use in the column
* @param version The version to use on the column
* @param cf The column family to use
*
* @return A populated mutation with the remove operations
*/
- private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final Id colId,
+ private MutationBatch removeIdType( final OrganizationScope scope, final Id rowId, final String idType,
final String edgeType, final UUID version,
final MultiTennantColumnFamily<OrganizationScope, EdgeIdTypeKey, String> cf ) {
@@ -256,7 +284,7 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
new ScopedRowKey<OrganizationScope, EdgeIdTypeKey>( scope, new EdgeIdTypeKey( rowId, edgeType ) );
- batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( colId.getType() );
+ batch.withRow( cf, rowKey ).setTimestamp( timestamp ).deleteColumn( idType );
return batch;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e8568ba4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 2d55698..5b4899a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -109,15 +109,17 @@ public class NodeDeleteListenerTest {
Id targetNode = edge.getTargetNode();
+ UUID version = UUIDGenerator.newTimeUUID();
- EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+
+ EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, version, sourceNode );
EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
assertNull( "Mark was not set, no delete should be executed", event );
- deleteEvent = new EdgeEvent<Id>( scope, targetNode );
+ deleteEvent = new EdgeEvent<Id>( scope, version, targetNode );
event = deleteListener.receive( deleteEvent ).toBlockingObservable().lastOrDefault( null );
@@ -152,7 +154,7 @@ public class NodeDeleteListenerTest {
nodeSerialization.mark( scope, sourceNode, deleteVersion ).execute();
- EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, sourceNode );
+ EdgeEvent<Id> deleteEvent = new EdgeEvent<Id>( scope, deleteVersion, sourceNode );
EdgeEvent<Id> event = deleteListener.receive( deleteEvent ).toBlockingObservable().last();