You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/08 01:20:51 UTC
[2/3] git commit: Initial refactor of listeners and event system
Initial refactor of listeners and event system
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c45f7ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c45f7ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c45f7ee9
Branch: refs/heads/asyncqueue
Commit: c45f7ee9d7e625fbe7ad89ffc42441a3453ecad0
Parents: da4630f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 12:07:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 12:07:35 2014 -0700
----------------------------------------------------------------------
.../graph/consistency/AsyncProcessor.java | 3 +-
.../graph/consistency/AsyncProcessorImpl.java | 6 +-
.../graph/consistency/ErrorListener.java | 5 +
.../graph/consistency/MessageListener.java | 4 +-
.../consistency/SimpleAsynchronousMessage.java | 3 +-
.../graph/impl/EdgeDeleteListener.java | 199 ++++++++++++++
.../persistence/graph/impl/EdgeEvent.java | 31 +++
.../persistence/graph/impl/EdgeManagerImpl.java | 261 -------------------
.../graph/impl/EdgeWriteListener.java | 103 ++++++++
.../graph/impl/NodeDeleteListener.java | 128 +++++++++
10 files changed, 476 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index f6013d9..150aa6b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -2,8 +2,7 @@ package org.apache.usergrid.persistence.graph.consistency;
/**
- * Used to fork lazy repair and other types of operations. This can be implemented
- * across multiple environments.
+ * Used to fork lazy repair and other types of operations.
*
*/
public interface AsyncProcessor<T> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index fbac644..20c40e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -19,7 +19,9 @@ import rx.util.functions.FuncN;
/**
- * The implementation of asynchronous processing
+ * The implementation of asynchronous processing.
+ * This is intentionally kept as a 1 processor to 1 event type mapping
+ * This way reflection is not used, event dispatching is easier, and has compile time checking
*/
@Singleton
public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@@ -84,7 +86,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
}
} ).subscribe( new Action1<AsynchronousMessage<T>>() {
@Override
- public void call( final AsynchronousMessage<T> tAsynchronousMessage ) {
+ public void call( final AsynchronousMessage<T> asynchronousMessage ) {
//To change body of implemented methods use File | Settings | File Templates.
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
index f736cdb..9d21304 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -6,5 +6,10 @@ package org.apache.usergrid.persistence.graph.consistency;
*/
public interface ErrorListener <T> {
+ /**
+ * Invoked when an error occurs during asynchronous processing
+ * @param event
+ * @param t
+ */
void onError( AsynchronousMessage<T> event, Throwable t );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
index 8a21dbb..466e4ed 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
@@ -1,6 +1,8 @@
package org.apache.usergrid.persistence.graph.consistency;
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
import rx.Observable;
@@ -17,6 +19,6 @@ public interface MessageListener<T, R> {
* @param event The input event
* @return The observable that performs the operations
*/
- Observable<T> receive(T event);
+ Observable<T> receive(final T event);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
index 1e7a04b..0a8651c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
@@ -2,7 +2,8 @@ package org.apache.usergrid.persistence.graph.consistency;
/**
- *
+ * Simple message that just contains the event and the timeout. More advanced queue implementations
+ * will most likely subclass this class.
*
*/
public class SimpleAsynchronousMessage<T> implements AsynchronousMessage<T> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
new file mode 100644
index 0000000..824b05c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -0,0 +1,199 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.EdgeManager;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.Func5;
+
+
+/**
+ * Construct the asynchronous delete operation from the listener
+ */
+@Singleton
+public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+
+ private final EdgeSerialization edgeSerialization;
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeManagerFactory edgeManagerFactory;
+ private final Keyspace keyspace;
+
+
+ @Inject
+ public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
+ final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
+ final AsyncProcessor edgeDelete ) {
+ this.edgeSerialization = edgeSerialization;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeManagerFactory = edgeManagerFactory;
+ this.keyspace = keyspace;
+
+ edgeDelete.addListener( this );
+ }
+
+
+ @Override
+ public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> delete ) {
+
+ final Edge edge = delete.getData();
+ final OrganizationScope scope = delete.getOrganizationScope();
+ final UUID maxVersion = edge.getVersion();
+ final EdgeManager edgeManager = edgeManagerFactory.createEdgeManager( scope );
+
+
+ return Observable.from( edge ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
+ @Override
+ public Observable<MutationBatch> call( final Edge edge ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+ //go through every version of this edge <= the current version and remove it
+ Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgeToTarget( scope,
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null ) );
+ }
+ } ).doOnEach( new Action1<MarkedEdge>() {
+ @Override
+ public void call( final MarkedEdge markedEdge ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
+ batch.mergeShallow( delete );
+ }
+ } );
+
+
+ //search by edge type and target type. If any other edges with this target type exist,
+ // we can't delete it
+ Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
+ new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
+ edge.getTargetNode().getType(), null ) ).take( 2 ).count()
+ .doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization
+ .removeEdgeTypeFromSource(
+ scope, edge );
+ batch.mergeShallow( delete );
+ }
+ }
+ } );
+
+
+ Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
+ new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
+ edge.getSourceNode().getType(), null ) ).take( 2 ).count()
+ .doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization
+ .removeEdgeTypeToTarget(
+ scope, edge );
+ batch.mergeShallow( delete );
+ }
+ }
+ } );
+
+
+ //search by edge type and target type. If any other edges with this target type exist,
+ // we can't delete it
+ Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+ .count().doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization.removeEdgeTypeFromSource( scope, edge );
+ }
+ }
+ } );
+
+
+ Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+ .count().doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization.removeEdgeTypeToTarget( scope, edge );
+ }
+ }
+ } );
+
+
+ //no op, just wait for each observable to populate the mutation before returning it
+ return Observable.zip( edges, sourceIdType, targetIdType, sourceType, targetType,
+ new Func5<MarkedEdge, Integer, Integer, Integer, Integer, MutationBatch>() {
+ @Override
+ public MutationBatch call( final MarkedEdge markedEdge, final Integer integer,
+ final Integer integer2, final Integer integer3,
+ final Integer integer4 ) {
+ return batch;
+ }
+ } );
+ }
+ }
+
+
+ ).map( new Func1<MutationBatch, EdgeEvent<Edge>>() {
+ @Override
+ public EdgeEvent<Edge> call( final MutationBatch mutationBatch ) {
+ try {
+ mutationBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
+
+ return delete;
+ }
+ } );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
new file mode 100644
index 0000000..d4b6f91
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -0,0 +1,31 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
+
+/**
+ * Get the edge event in the organizational scope
+ *
+ */
+public class EdgeEvent<T> {
+
+ private final OrganizationScope organizationScope;
+ private final T data;
+
+
+ public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+ this.organizationScope = organizationScope;
+ this.data = data;
+ }
+
+
+ public OrganizationScope getOrganizationScope() {
+ return organizationScope;
+ }
+
+
+ public T getData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 1cc2fce..663215a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
-import org.apache.usergrid.persistence.graph.consistency.MessageListener;
import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
import org.apache.usergrid.persistence.graph.guice.NodeDelete;
@@ -50,7 +49,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
@@ -58,9 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Action1;
import rx.util.functions.Func1;
-import rx.util.functions.Func4;
/**
@@ -109,16 +105,10 @@ public class EdgeManagerImpl implements EdgeManager {
this.edgeWriteAsyncProcessor = edgeWrite;
- this.edgeWriteAsyncProcessor.addListener( new EdgeWriteListener() );
-
-
this.edgeDeleteAsyncProcessor = edgeDelete;
- this.edgeDeleteAsyncProcessor.addListener( new EdgeDeleteListener() );
this.nodeDeleteAsyncProcessor = nodeDelete;
-
- this.nodeDeleteAsyncProcessor.addListener( new NodeDeleteListener() );
}
@@ -415,255 +405,4 @@ public class EdgeManagerImpl implements EdgeManager {
return true;
}
}
-
-
- /**
- * Construct the asynchronous edge lister for the repair operation.
- */
- public class EdgeWriteListener implements MessageListener<Edge, Edge> {
-
- @Override
- public Observable<Edge> receive( final Edge write ) {
-
- final UUID maxVersion = write.getVersion();
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( write.getSourceNode(), write.getType(), write.getTargetNode(),
- maxVersion, null );
-
- return edgeSerialization.getEdgeFromSource( scope, search );
- }
- } ).filter( new Func1<MarkedEdge, Boolean>() {
-
- //TODO, reuse this for delete operation
-
-
- /**
- * We only want to return edges < this version so we remove them
- * @param markedEdge
- * @return
- */
- @Override
- public Boolean call( final MarkedEdge markedEdge ) {
- return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
- }
- //buffer the deletes and issue them in a single mutation
- } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Edge>() {
- @Override
- public Edge call( final List<MarkedEdge> markedEdges ) {
-
- final int size = markedEdges.size();
-
- final MutationBatch batch = edgeSerialization.deleteEdge( scope, markedEdges.get( 0 ) );
-
- for ( int i = 1; i < size; i++ ) {
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdges.get( i ) );
-
- batch.mergeShallow( delete );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to issue write to cassandra", e );
- }
-
- return write;
- }
- } );
- }
- }
-
-
- /**
- * Construct the asynchronous delete operation from the listener
- */
- public class EdgeDeleteListener implements MessageListener<Edge, Edge> {
-
- @Override
- public Observable<Edge> receive( final Edge delete ) {
-
- final UUID maxVersion = delete.getVersion();
-
- return Observable.from( delete ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
- @Override
- public Observable<MutationBatch> call( final Edge edge ) {
-
- //search by edge type and target type. If any other edges with this target type exist,
- // we can't delete it
- Observable<MutationBatch> sourceIdType = loadEdgesFromSourceByType(
- new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
- edge.getTargetNode().getType(), null ) ).take( 2 ).count()
- .map( new Func1<Integer, MutationBatch>() {
- @Override
- public MutationBatch call( final Integer count ) {
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
- return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
- }
- } );
-
-
- Observable<MutationBatch> targetIdType = loadEdgesToTargetByType(
- new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
- edge.getSourceNode().getType(), null ) ).take( 2 ).count()
- .map( new Func1<Integer, MutationBatch>() {
- @Override
- public MutationBatch call( final Integer count ) {
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
-
- return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
- }
- } );
-
- //search by edge type and target type. If any other edges with this target type exist,
- // we can't delete it
- Observable<MutationBatch> sourceType = loadEdgesFromSource(
- new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) )
- .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
- @Override
- public MutationBatch call( final Integer count ) {
-
-
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
-
- return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
- }
- } );
-
-
- Observable<MutationBatch> targetType = loadEdgesToTarget(
- new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) )
- .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
-
-
- @Override
- public MutationBatch call( final Integer count ) {
-
-
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
- return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
- }
- } );
-
-
- return Observable.zip( sourceIdType, targetIdType, sourceType, targetType,
- new Func4<MutationBatch, MutationBatch, MutationBatch, MutationBatch, MutationBatch>() {
-
-
- @Override
- public MutationBatch call( final MutationBatch mutationBatch,
- final MutationBatch mutationBatch2,
- final MutationBatch mutationBatch3,
- final MutationBatch mutationBatch4 ) {
-
- return join( join( join( mutationBatch, mutationBatch2 ), mutationBatch3 ),
- mutationBatch4 );
- }
-
-
- private MutationBatch join( MutationBatch first, MutationBatch second ) {
- if ( first == null ) {
- if ( second == null ) {
- return null;
- }
-
- return second;
- }
-
-
- else if ( second == null ) {
- return first;
- }
-
- first.mergeShallow( second );
-
- return first;
- }
- } );
- }
- } ).map( new Func1<MutationBatch, Edge>() {
- @Override
- public Edge call( final MutationBatch mutationBatch ) {
- try {
- mutationBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
-
- return delete;
- }
- } );
- }
- }
-
-
- /**
- * Construct the asynchronous node delete from the q
- */
- public class NodeDeleteListener implements MessageListener<Id, Id> {
-
- @Override
- public Observable<Id> receive( final Id node ) {
-
-
- return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
- @Override
- public Optional<UUID> call( final Id id ) {
- return nodeSerialization.getMaxVersion( scope, node );
- }
- } ).flatMap( new Func1<Optional<UUID>, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final Optional<UUID> uuidOptional ) {
- return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
-
- //for each edge type, we want to search all edges < this version to the node and
- // delete them. We might want to batch this up for efficiency
- return loadEdgesToTarget(
- new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
- .doOnEach( new Action1<Edge>() {
- @Override
- public void call( final Edge edge ) {
- deleteEdge( edge );
- }
- } );
- }
- } );
- }
- } ).map( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return node;
- }
- } );
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
new file mode 100644
index 0000000..428085e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -0,0 +1,103 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous edge lister for the repair operation.
+ */
+@Singleton
+public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+ private final EdgeSerialization edgeSerialization;
+ private final GraphFig graphFig;
+ private final Keyspace keyspace;
+
+
+ public EdgeWriteListener( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, @EdgeWrite final AsyncProcessor edgeWrite ) {
+ this.edgeSerialization = edgeSerialization;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
+ edgeWrite.addListener( this );
+ }
+
+
+ @Override
+ public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> write ) {
+
+ final Edge edge = write.getData();
+ final OrganizationScope scope = write.getOrganizationScope();
+ final UUID maxVersion = edge.getVersion();
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
+ null );
+
+ return edgeSerialization.getEdgeFromSource( scope, search );
+ }
+ } ).filter( new Func1<MarkedEdge, Boolean>() {
+
+ //TODO, reuse this for delete operation
+
+
+ /**
+ * We only want to return edges < this version so we remove them
+ * @param markedEdge
+ * @return
+ */
+ @Override
+ public Boolean call( final MarkedEdge markedEdge ) {
+ return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+ }
+ //buffer the deletes and issue them in a single mutation
+ } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
+ @Override
+ public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( MarkedEdge edge : markedEdges ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+ batch.mergeShallow( delete );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to issue write to cassandra", e );
+ }
+
+ return write;
+ }
+ } );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
new file mode 100644
index 0000000..08cc942
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.NodeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous node delete from the q
+ */
+public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
+
+ private final NodeSerialization nodeSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+
+
+ /**
+ * Wire the serialization dependencies
+ */
+ @Inject
+ public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
+ final EdgeMetadataSerialization edgeMetadataSerialization, @NodeDelete final AsyncProcessor nodeDelete) {
+
+
+ this.nodeSerialization = nodeSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ nodeDelete.addListener( this );
+ }
+
+
+ @Override
+ public Observable<EdgeEvent<Id>> receive( final EdgeEvent<Id> edgeEvent ) {
+
+ final Id node = edgeEvent.getData();
+ final OrganizationScope scope = edgeEvent.getOrganizationScope();
+
+
+ return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+ @Override
+ public Optional<UUID> call( final Id id ) {
+ return nodeSerialization.getMaxVersion( scope, node );
+ }
+ } ).flatMap( new Func1<Optional<UUID>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final Optional<UUID> uuidOptional ) {
+
+ return getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+ .flatMap( new Func1<String, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final String edgeType ) {
+
+ //for each edge type, we want to search all edges < this version to the node and
+ // delete them. We might want to batch this up for efficiency
+ return loadEdgesToTarget( scope,
+ new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+ .doOnEach( new Action1<MarkedEdge>() {
+ @Override
+ public void call( final MarkedEdge markedEdge ) {
+ edgeSerialization.deleteEdge( scope, markedEdge );
+ }
+ } );
+ }
+ } );
+ }
+ } ).map( new Func1<MarkedEdge, EdgeEvent<Id>>() {
+ @Override
+ public EdgeEvent<Id> call( final MarkedEdge edge ) {
+ return edgeEvent;
+ }
+ } );
+ }
+
+
+ /**
+ * Get all existing edge types to the target node
+ * @param scope
+ * @param search
+ * @return
+ */
+ private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ }
+ } );
+ }
+
+
+ /**
+ * Load all edges pointing to this target
+ * @param scope
+ * @param search
+ * @return
+ */
+ private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTarget( scope, search );
+ }
+ } );
+ }
+}