You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/08 01:20:50 UTC
[1/3] git commit: Refactor to use the observables. Evaluating
hazelcast
Repository: incubator-usergrid
Updated Branches:
refs/heads/asyncqueue 4012eb73f -> 51a9ffd92
Refactor to use the observables. Evaluating hazelcast
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/da4630f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/da4630f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/da4630f5
Branch: refs/heads/asyncqueue
Commit: da4630f543fab7eb73fdbfab9535c82c04568179
Parents: 4012eb7
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 3 11:16:30 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 3 11:16:30 2014 -0700
----------------------------------------------------------------------
.../graph/consistency/AsyncProcessor.java | 8 +-
.../graph/consistency/AsyncProcessorImpl.java | 64 ++++----
.../graph/consistency/AsynchronousEvent.java | 18 ---
.../consistency/AsynchronousEventListener.java | 22 ---
.../graph/consistency/AsynchronousMessage.java | 18 +++
.../graph/consistency/ErrorListener.java | 2 +-
.../graph/consistency/LocalTimeoutQueue.java | 21 +--
.../graph/consistency/MessageListener.java | 22 +++
.../consistency/SimpleAsynchronousEvent.java | 30 ----
.../consistency/SimpleAsynchronousMessage.java | 30 ++++
.../graph/consistency/TimeoutQueue.java | 8 +-
.../persistence/graph/impl/EdgeManagerImpl.java | 153 +++++++++----------
.../graph/consistency/AsyncProcessorTest.java | 56 ++++---
.../consistency/LocalTimeoutQueueTest.java | 28 ++--
14 files changed, 245 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 3b3e0b9..f6013d9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -19,16 +19,16 @@ public interface AsyncProcessor<T> {
* @param event The event to be scheduled for verification
* @param timeout The time in milliseconds we should wait before the event should fire
*/
- public AsynchronousEvent<T> setVerification( T event, long timeout );
+ public AsynchronousMessage<T> setVerification( T event, long timeout );
/**
- * Start processing the event immediately asynchronously. In the event an exception is thrown, the AsynchronousEvent should be re-tried.
+ * Start processing the event immediately asynchronously. In the event an exception is thrown, the AsynchronousMessage should be re-tried.
* It is up to the implementer to commit the event so that it does not fire again. This should never throw exceptions.
*
* @param event The event to start
*/
- public void start(AsynchronousEvent<T> event);
+ public void start(AsynchronousMessage<T> event);
/**
* Add the error listener to the list of listeners
@@ -40,7 +40,7 @@ public interface AsyncProcessor<T> {
* Add the listener to this instance
* @param listener
*/
- public void addListener(AsynchronousEventListener<T> listener);
+ public void addListener(MessageListener<T, T> listener);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 5acf728..fbac644 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -9,11 +9,13 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
-import rx.Observer;
+import rx.Observable;
import rx.Scheduler;
+import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+import rx.util.functions.FuncN;
/**
@@ -26,7 +28,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
private final TimeoutQueue<T> queue;
private final Scheduler scheduler;
- private final List<AsynchronousEventListener<T>> listeners = new ArrayList<AsynchronousEventListener<T>>( );
+ private final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
@@ -40,60 +42,57 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
}
-
@Override
- public AsynchronousEvent<T> setVerification( final T event, final long timeout ) {
+ public AsynchronousMessage<T> setVerification( final T event, final long timeout ) {
return queue.queue( event, timeout );
}
-
-
@Override
- public void start( final AsynchronousEvent<T> event ) {
+ public void start( final AsynchronousMessage<T> event ) {
- //run this in a timeout command so it doesn't run forever. If it times out, it will simply resume later
- new HystrixCommand<Void>( GRAPH_REPAIR ) {
-
- @Override
- protected Void run() throws Exception {
- final T rootEvent = event.getEvent();
+ final T data = event.getEvent();
+ /**
+ * Execute all listeners in parallel
+ */
+ List<Observable<?>> observables = new ArrayList<Observable<?>>( listeners.size() );
- for(AsynchronousEventListener<T> listener: listeners){
- listener.receive( rootEvent );
- }
+ for ( MessageListener<T, T> listener : listeners ) {
+ observables.add( listener.receive( data ).subscribeOn( scheduler ) );
+ }
- return null;
- }
- }.toObservable( scheduler ).subscribe( new Observer<Void>() {
+ //run everything in parallel and zip it up
+ Observable.zip( observables, new FuncN<AsynchronousMessage<T>>() {
@Override
- public void onCompleted() {
- queue.remove( event );
+ public AsynchronousMessage<T> call( final Object... args ) {
+ return event;
}
-
-
+ } ).doOnError( new Action1<Throwable>() {
@Override
- public void onError( final Throwable throwable ) {
+ public void call( final Throwable throwable ) {
LOG.error( "Unable to process async event", throwable );
for ( ErrorListener listener : errorListeners ) {
listener.onError( event, throwable );
}
}
-
-
+ } ).doOnCompleted( new Action0() {
+ @Override
+ public void call() {
+ queue.remove( event );
+ }
+ } ).subscribe( new Action1<AsynchronousMessage<T>>() {
@Override
- public void onNext( final Void args ) {
- //nothing to do here
+ public void call( final AsynchronousMessage<T> tAsynchronousMessage ) {
+ //To change body of implemented methods use File | Settings | File Templates.
}
} );
-
}
@Override
- public void addListener( final AsynchronousEventListener<T> listener ) {
+ public void addListener( final MessageListener<T, T> listener ) {
this.listeners.add( listener );
}
@@ -104,7 +103,4 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
public void addErrorListener( ErrorListener<T> listener ) {
this.errorListeners.add( listener );
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java
deleted file mode 100644
index fe38a93..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEvent.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- * An interface for a timeout event
- */
-public interface AsynchronousEvent<T> {
-
- /**
- * @return The event to fire when our timeout is reached
- */
- T getEvent();
-
- /**
- * @return The time in epoch millis the event will time out
- */
- long getTimeout();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
deleted file mode 100644
index 6cb93cb..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-import rx.Observable;
-
-
-/**
- *
- *
- */
-public interface AsynchronousEventListener<T, R> {
-
-
- /**
- * The handler to receive the event. Any exception that is thrown is considered
- * a failure, and the event will be re-fired.
- * @param event The input event
- * @return The observable that performs the operations
- */
- Observable<R> receive(T event);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java
new file mode 100644
index 0000000..c9bd00a
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousMessage.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * An interface for a timeout event
+ */
+public interface AsynchronousMessage<T> {
+
+ /**
+ * @return The event to fire when our timeout is reached
+ */
+ T getEvent();
+
+ /**
+ * @return The time in epoch millis the event will time out
+ */
+ long getTimeout();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
index f6cef41..f736cdb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -6,5 +6,5 @@ package org.apache.usergrid.persistence.graph.consistency;
*/
public interface ErrorListener <T> {
- void onError( AsynchronousEvent<T> event, Throwable t );
+ void onError( AsynchronousMessage<T> event, Throwable t );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
index 3a3b2ce..d2cc17f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
@@ -23,7 +23,7 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
/**
* For in memory queueing
*/
- private final PriorityBlockingQueue<AsynchronousEvent<T>> queue = new PriorityBlockingQueue<AsynchronousEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
+ private final PriorityBlockingQueue<AsynchronousMessage<T>> queue = new PriorityBlockingQueue<AsynchronousMessage<T>>( 1000, new TimeoutEventCompatator<T>() );
private final TimeService timeService;
@@ -35,9 +35,9 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
@Override
- public AsynchronousEvent<T> queue( final T event, final long timeout ) {
+ public AsynchronousMessage<T> queue( final T event, final long timeout ) {
final long scheduledTimeout = timeService.getCurrentTime() + timeout;
- final AsynchronousEvent<T> queuedEvent = new SimpleAsynchronousEvent<T>( event, scheduledTimeout );
+ final AsynchronousMessage<T> queuedEvent = new SimpleAsynchronousMessage<T>( event, scheduledTimeout );
queue.add( queuedEvent );
@@ -46,16 +46,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
@Override
- public Collection<AsynchronousEvent<T>> take( final int maxSize, final long timeout ) {
+ public Collection<AsynchronousMessage<T>> take( final int maxSize, final long timeout ) {
final long now = timeService.getCurrentTime();
final long newTimeout = now+timeout;
- List<AsynchronousEvent<T>> results = new ArrayList<AsynchronousEvent<T>>(maxSize);
+ List<AsynchronousMessage<T>> results = new ArrayList<AsynchronousMessage<T>>(maxSize);
for(int i = 0; i < maxSize; i ++){
- AsynchronousEvent<T> queuedEvent = queue.peek();
+ AsynchronousMessage<T> queuedEvent = queue.peek();
//nothing to do
if(queuedEvent == null){
@@ -68,7 +68,8 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
break;
}
- final AsynchronousEvent<T> newEvent = new SimpleAsynchronousEvent<T>( queuedEvent.getEvent(), newTimeout );
+ final AsynchronousMessage<T>
+ newEvent = new SimpleAsynchronousMessage<T>( queuedEvent.getEvent(), newTimeout );
//re schedule a new event to replace this one
queue.add(newEvent);
@@ -85,16 +86,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
@Override
- public boolean remove( final AsynchronousEvent<T> event ) {
+ public boolean remove( final AsynchronousMessage<T> event ) {
return queue.remove( event );
}
- private static class TimeoutEventCompatator<T> implements Comparator<AsynchronousEvent<T>> {
+ private static class TimeoutEventCompatator<T> implements Comparator<AsynchronousMessage<T>> {
@Override
- public int compare( final AsynchronousEvent<T> o1, final AsynchronousEvent<T> o2 ) {
+ public int compare( final AsynchronousMessage<T> o1, final AsynchronousMessage<T> o2 ) {
return Long.compare( o1.getTimeout(), o2.getTimeout() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
new file mode 100644
index 0000000..8a21dbb
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
@@ -0,0 +1,22 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import rx.Observable;
+
+
+/**
+ *
+ *
+ */
+public interface MessageListener<T, R> {
+
+
+ /**
+ * The handler to receive the event. Any exception that is thrown is considered
+ * a failure, and the event will be re-fired.
+ * @param event The input event
+ * @return The observable that performs the operations
+ */
+ Observable<T> receive(T event);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java
deleted file mode 100644
index fdfcd60..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- *
- *
- */
-public class SimpleAsynchronousEvent<T> implements AsynchronousEvent<T> {
-
- private final T event;
- private final long timeout;
-
-
- public SimpleAsynchronousEvent( final T event, final long timeout ) {
- this.event = event;
- this.timeout = timeout;
- }
-
-
- @Override
- public T getEvent() {
- return event;
- }
-
-
- @Override
- public long getTimeout() {
- return timeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
new file mode 100644
index 0000000..1e7a04b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
@@ -0,0 +1,30 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ *
+ *
+ */
+public class SimpleAsynchronousMessage<T> implements AsynchronousMessage<T> {
+
+ private final T event;
+ private final long timeout;
+
+
+ public SimpleAsynchronousMessage( final T event, final long timeout ) {
+ this.event = event;
+ this.timeout = timeout;
+ }
+
+
+ @Override
+ public T getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
index 556aa88..0caed9c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
@@ -14,9 +14,9 @@ public interface TimeoutQueue<T> {
*
* @param event The event to queue
* @param timeout The timeout to set on the queue element
- * @return The AsynchronousEvent that has been queued
+ * @return The AsynchronousMessage that has been queued
*/
- public AsynchronousEvent<T> queue( T event, long timeout );
+ public AsynchronousMessage<T> queue( T event, long timeout );
/**
@@ -29,7 +29,7 @@ public interface TimeoutQueue<T> {
*
* @return A collection of events.
*/
- public Collection<AsynchronousEvent<T>> take( int maxSize, long timeout );
+ public Collection<AsynchronousMessage<T>> take( int maxSize, long timeout );
/**
@@ -39,5 +39,5 @@ public interface TimeoutQueue<T> {
*
* @return True if the element was removed. False otherwise
*/
- public boolean remove( AsynchronousEvent<T> event );
+ public boolean remove( AsynchronousMessage<T> event );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 7fd9246..1cc2fce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -36,8 +36,8 @@ import org.apache.usergrid.persistence.graph.SearchByIdType;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
-import org.apache.usergrid.persistence.graph.consistency.AsynchronousEvent;
-import org.apache.usergrid.persistence.graph.consistency.AsynchronousEventListener;
+import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
import org.apache.usergrid.persistence.graph.guice.NodeDelete;
@@ -135,7 +135,7 @@ public class EdgeManagerImpl implements EdgeManager {
mutation.mergeShallow( edgeMutation );
- final AsynchronousEvent<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
+ final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
try {
mutation.execute();
@@ -161,7 +161,7 @@ public class EdgeManagerImpl implements EdgeManager {
public Edge call( final Edge edge ) {
final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
- final AsynchronousEvent<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
+ final AsynchronousMessage<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
try {
@@ -191,7 +191,7 @@ public class EdgeManagerImpl implements EdgeManager {
final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
- final AsynchronousEvent<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
+ final AsynchronousMessage<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
try {
@@ -420,10 +420,10 @@ public class EdgeManagerImpl implements EdgeManager {
/**
* Construct the asynchronous edge lister for the repair operation.
*/
- public class EdgeWriteListener implements AsynchronousEventListener<Edge, Integer> {
+ public class EdgeWriteListener implements MessageListener<Edge, Edge> {
@Override
- public Observable<Integer> receive( final Edge write ) {
+ public Observable<Edge> receive( final Edge write ) {
final UUID maxVersion = write.getVersion();
@@ -452,9 +452,9 @@ public class EdgeManagerImpl implements EdgeManager {
return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
}
//buffer the deletes and issue them in a single mutation
- } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Integer>() {
+ } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Edge>() {
@Override
- public Integer call( final List<MarkedEdge> markedEdges ) {
+ public Edge call( final List<MarkedEdge> markedEdges ) {
final int size = markedEdges.size();
@@ -473,7 +473,7 @@ public class EdgeManagerImpl implements EdgeManager {
throw new RuntimeException( "Unable to issue write to cassandra", e );
}
- return size;
+ return write;
}
} );
}
@@ -483,10 +483,10 @@ public class EdgeManagerImpl implements EdgeManager {
/**
* Construct the asynchronous delete operation from the listener
*/
- public class EdgeDeleteListener implements AsynchronousEventListener<Edge, Integer> {
+ public class EdgeDeleteListener implements MessageListener<Edge, Edge> {
@Override
- public Observable<Integer> receive( final Edge delete ) {
+ public Observable<Edge> receive( final Edge delete ) {
final UUID maxVersion = delete.getVersion();
@@ -496,76 +496,57 @@ public class EdgeManagerImpl implements EdgeManager {
//search by edge type and target type. If any other edges with this target type exist,
// we can't delete it
- Observable<Integer> sourceIdType= loadEdgesFromSourceByType(
+ Observable<MutationBatch> sourceIdType = loadEdgesFromSourceByType(
new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
- edge.getTargetNode().getType(), null ) ).take( 2 ).count().doOnEach(
- new Action1<Integer>() {
+ edge.getTargetNode().getType(), null ) ).take( 2 ).count()
+ .map( new Func1<Integer, MutationBatch>() {
@Override
- public void call( final Integer count ) {
+ public MutationBatch call( final Integer count ) {
//There's nothing to do, we have 2 different edges with the same edge type and
// target type. Don't delete meta data
if ( count == 2 ) {
- return;
+ return null;
}
- try {
- edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete ).execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
+ return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
}
} );
- Observable<Integer> targetIdType = loadEdgesToTargetByType(
+ Observable<MutationBatch> targetIdType = loadEdgesToTargetByType(
new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
edge.getSourceNode().getType(), null ) ).take( 2 ).count()
- .doOnEach( new Action1<Integer>() {
-
-
+ .map( new Func1<Integer, MutationBatch>() {
@Override
- public void call( final Integer count ) {
-
-
+ public MutationBatch call( final Integer count ) {
//There's nothing to do, we have 2 different edges with the same edge type and
// target type. Don't delete meta data
if ( count == 2 ) {
- return;
+ return null;
}
- try {
- edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete ).execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
+
+ return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
}
} );
//search by edge type and target type. If any other edges with this target type exist,
// we can't delete it
- Observable<Integer> sourceType = loadEdgesFromSource(
+ Observable<MutationBatch> sourceType = loadEdgesFromSource(
new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) )
- .take( 2 ).count().doOnEach( new Action1<Integer>() {
-
-
+ .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
@Override
- public void call( final Integer count ) {
+ public MutationBatch call( final Integer count ) {
//There's nothing to do, we have 2 different edges with the same edge type and
// target type. Don't delete meta data
if ( count == 2 ) {
- return;
+ return null;
}
- try {
- edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete ).execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
+
+ return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
}
} );
@@ -625,12 +606,19 @@ public class EdgeManagerImpl implements EdgeManager {
}
} );
}
- } ).map( new Func1<MutationBatch, MutationBatch>() {
+ } ).map( new Func1<MutationBatch, Edge>() {
@Override
- public MutationBatch call( final MutationBatch mutationBatch ) {
- return mutationBatch;
+ public Edge call( final MutationBatch mutationBatch ) {
+ try {
+ mutationBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
+
+ return delete;
}
- } ).count();
+ } );
}
}
@@ -638,35 +626,44 @@ public class EdgeManagerImpl implements EdgeManager {
/**
* Construct the asynchronous node delete from the q
*/
- public class NodeDeleteListener implements AsynchronousEventListener<Id, Integer> {
+ public class NodeDeleteListener implements MessageListener<Id, Id> {
@Override
- public Observable<Integer> receive( final Id node ) {
+ public Observable<Id> receive( final Id node ) {
- final Optional<UUID> mark = nodeSerialization.getMaxVersion( scope, node );
- //Nothing to do, just exist with 0 count
- if ( !mark.isPresent() ) {
- return Observable.just( 0 );
- }
-
- final UUID maxVersion = mark.get();
-
- return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
-
- //for each edge type, we want to search all edges < this version to the node and delete them. We might want to batch this up for efficiency
- return loadEdgesToTarget( new SimpleSearchByEdgeType( node, edgeType, maxVersion, null ) )
- .doOnEach( new Action1<Edge>() {
- @Override
- public void call( final Edge edge ) {
- deleteEdge( edge );
- }
- } );
- }
- } ).count();
+ return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+ @Override
+ public Optional<UUID> call( final Id id ) {
+ return nodeSerialization.getMaxVersion( scope, node );
+ }
+ } ).flatMap( new Func1<Optional<UUID>, Observable<Edge>>() {
+ @Override
+ public Observable<Edge> call( final Optional<UUID> uuidOptional ) {
+ return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
+ .flatMap( new Func1<String, Observable<Edge>>() {
+ @Override
+ public Observable<Edge> call( final String edgeType ) {
+
+ //for each edge type, we want to search all edges < this version to the node and
+ // delete them. We might want to batch this up for efficiency
+ return loadEdgesToTarget(
+ new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+ .doOnEach( new Action1<Edge>() {
+ @Override
+ public void call( final Edge edge ) {
+ deleteEdge( edge );
+ }
+ } );
+ }
+ } );
+ }
+ } ).map( new Func1<Edge, Id>() {
+ @Override
+ public Id call( final Edge edge ) {
+ return node;
+ }
+ } );
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index 6224b0a..fc53705 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -27,7 +27,9 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import rx.Observable;
import rx.concurrency.Schedulers;
+import rx.util.functions.Action1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -52,7 +54,7 @@ public class AsyncProcessorTest {
final TestEvent event = new TestEvent();
- final AsynchronousEvent<TestEvent> asynchronousEvent = new AsynchronousEvent<TestEvent>() {
+ final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
@Override
public TestEvent getEvent() {
return event;
@@ -72,13 +74,13 @@ public class AsyncProcessorTest {
//mock up the queue
- when( queue.queue( event, timeout ) ).thenReturn( asynchronousEvent );
+ when( queue.queue( event, timeout ) ).thenReturn( asynchronousMessage );
- AsynchronousEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
+ AsynchronousMessage<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
//ensure the timeouts are returned from the Queue subsystem
- assertSame( asynchronousEvent, returned );
+ assertSame( asynchronousMessage, returned );
}
@@ -90,7 +92,7 @@ public class AsyncProcessorTest {
final TestEvent event = new TestEvent();
- final AsynchronousEvent<TestEvent> asynchronousEvent = new AsynchronousEvent<TestEvent>() {
+ final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
@Override
public TestEvent getEvent() {
return event;
@@ -111,7 +113,7 @@ public class AsyncProcessorTest {
final CountDownLatch latch = new CountDownLatch( 1 );
//mock up the ack to allow us to block the test until the async confirm fires
- when( queue.remove( asynchronousEvent ) ).thenAnswer( new Answer<Boolean>() {
+ when( queue.remove( asynchronousMessage ) ).thenAnswer( new Answer<Boolean>() {
@Override
public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
latch.countDown();
@@ -120,7 +122,7 @@ public class AsyncProcessorTest {
} );
- asyncProcessor.start( asynchronousEvent );
+ asyncProcessor.start( asynchronousMessage );
//block until the event is fired. The correct invocation is implicitly verified by the remove mock
@@ -145,7 +147,7 @@ public class AsyncProcessorTest {
final boolean[] invoked = new boolean[] { false, false };
- final AsynchronousEvent<TestEvent> asynchronousEvent = new AsynchronousEvent<TestEvent>() {
+ final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
@Override
public TestEvent getEvent() {
return event;
@@ -165,12 +167,12 @@ public class AsyncProcessorTest {
final CountDownLatch latch = new CountDownLatch( 1 );
- final AsynchronousEvent<?>[] errorEvents = { null };
+ final AsynchronousMessage<?>[] errorEvents = { null };
//countdown the latch so the test can proceed
asyncProcessor.addErrorListener( new ErrorListener<TestEvent>() {
@Override
- public void onError( final AsynchronousEvent<TestEvent> event, final Throwable t ) {
+ public void onError( final AsynchronousMessage<TestEvent> event, final Throwable t ) {
errorEvents[0] = event;
invoked[1] = true;
latch.countDown();
@@ -179,7 +181,7 @@ public class AsyncProcessorTest {
} );
//throw an error if remove is called. This shouldn't happen
- when( queue.remove( asynchronousEvent ) ).then( new Answer<Boolean>() {
+ when( queue.remove( asynchronousMessage ) ).then( new Answer<Boolean>() {
@Override
public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
invoked[0] = true;
@@ -189,7 +191,7 @@ public class AsyncProcessorTest {
//fire the event
- asyncProcessor.start( asynchronousEvent );
+ asyncProcessor.start( asynchronousMessage );
//block until the event is fired. The invocation verification is part of the error listener unlocking
@@ -210,7 +212,7 @@ public class AsyncProcessorTest {
/**
* Construct the async processor
*/
- public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , AsynchronousEventListener<T> listener) {
+ public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , MessageListener<T, T> listener) {
AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
processor.addListener( listener );
@@ -230,7 +232,7 @@ public class AsyncProcessorTest {
}
- public static class TestListener implements AsynchronousEventListener<TestEvent> {
+ public static class TestListener implements MessageListener<TestEvent, TestEvent> {
public final Stack<TestEvent> events = new Stack<TestEvent>();
@@ -239,9 +241,16 @@ public class AsyncProcessorTest {
}
+
@Override
- public void receive( final TestEvent event ) {
- events.push( event );
+ public Observable<TestEvent> receive( final TestEvent event ) {
+
+ return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
+ @Override
+ public void call( final TestEvent testEvent ) {
+ events.push( testEvent );
+ }
+ });
}
}
@@ -249,15 +258,22 @@ public class AsyncProcessorTest {
/**
* Throw error after the event is fired
*/
- public static class AsynchronousErrorListener implements AsynchronousEventListener<TestEvent> {
+ public static class AsynchronousErrorListener implements MessageListener<TestEvent, TestEvent> {
public final Stack<TestEvent> events = new Stack<TestEvent>();
@Override
- public void receive( final TestEvent event ) {
- events.push( event );
- throw new RuntimeException( "Test Exception thrown. Failed to process event" );
+ public Observable<TestEvent> receive( final TestEvent event ) {
+ return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
+ @Override
+ public void call( final TestEvent testEvent ) {
+ events.push( testEvent );
+ throw new RuntimeException( "Test Exception thrown. Failed to process event" );
+ }
+ });
}
+
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/da4630f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
index 76e09f6..49ae789 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -34,14 +34,14 @@ public class LocalTimeoutQueueTest {
final TestEvent event = new TestEvent();
- AsynchronousEvent<TestEvent> asynchronousEvent = queue.queue( event, timeout );
+ AsynchronousMessage<TestEvent> asynchronousMessage = queue.queue( event, timeout );
- assertNotNull( asynchronousEvent );
+ assertNotNull( asynchronousMessage );
- assertEquals( event, asynchronousEvent.getEvent() );
- assertEquals( time + timeout, asynchronousEvent.getTimeout() );
+ assertEquals( event, asynchronousMessage.getEvent() );
+ assertEquals( time + timeout, asynchronousMessage.getTimeout() );
- Collection<AsynchronousEvent<TestEvent>> results = queue.take( 100, timeout );
+ Collection<AsynchronousMessage<TestEvent>> results = queue.take( 100, timeout );
assertEquals( "Time not yet elapsed", 0, results.size() );
@@ -55,9 +55,9 @@ public class LocalTimeoutQueueTest {
assertEquals( "Time elapsed", 1, results.size() );
//validate we get a new timeout event since the old one was re-scheduled
- Iterator<AsynchronousEvent<TestEvent>> events = results.iterator();
+ Iterator<AsynchronousMessage<TestEvent>> events = results.iterator();
- AsynchronousEvent<TestEvent> message = events.next();
+ AsynchronousMessage<TestEvent> message = events.next();
assertEquals( event, message.getEvent() );
@@ -97,18 +97,18 @@ public class LocalTimeoutQueueTest {
final TestEvent event = new TestEvent();
- AsynchronousEvent<TestEvent> asynchronousEvent = queue.queue( event, timeout );
+ AsynchronousMessage<TestEvent> asynchronousMessage = queue.queue( event, timeout );
events.add( event );
- assertNotNull( asynchronousEvent );
+ assertNotNull( asynchronousMessage );
- assertEquals( event, asynchronousEvent.getEvent() );
- assertEquals( time + timeout, asynchronousEvent.getTimeout() );
+ assertEquals( event, asynchronousMessage.getEvent() );
+ assertEquals( time + timeout, asynchronousMessage.getTimeout() );
}
- Collection<AsynchronousEvent<TestEvent>> results = queue.take( 100, timeout );
+ Collection<AsynchronousMessage<TestEvent>> results = queue.take( 100, timeout );
assertEquals( "Time not yet elapsed", 0, results.size() );
@@ -133,11 +133,11 @@ public class LocalTimeoutQueueTest {
assertEquals( "Time elapsed", 100, results.size() );
//validate we get a new timeout event since the old one was re-scheduled
- Iterator<AsynchronousEvent<TestEvent>> eventIterator = results.iterator();
+ Iterator<AsynchronousMessage<TestEvent>> eventIterator = results.iterator();
while(eventIterator.hasNext()){
- AsynchronousEvent<TestEvent> message = eventIterator.next();
+ AsynchronousMessage<TestEvent> message = eventIterator.next();
assertTrue( events.remove( message.getEvent() ) );
[3/3] git commit: Upgraded queue system and processing
Posted by sn...@apache.org.
Upgraded queue system and processing
Updated RX to 0.17-RC7
Rolled custom Hystrix to test RX integration
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/51a9ffd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/51a9ffd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/51a9ffd9
Branch: refs/heads/asyncqueue
Commit: 51a9ffd9218bdaf4884709fd04bae9c046dc67ec
Parents: c45f7ee
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 16:59:06 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 16:59:06 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 21 +--
.../collection/hystrix/CassandraCommand.java | 4 +-
.../impl/EntityCollectionManagerImpl.java | 6 +-
.../mvcc/stage/delete/MarkCommit.java | 2 +-
.../collection/mvcc/stage/delete/MarkStart.java | 2 +-
.../collection/mvcc/stage/load/Load.java | 2 +-
.../mvcc/stage/write/WriteCommit.java | 2 +-
.../mvcc/stage/write/WriteOptimisticVerify.java | 2 +-
.../collection/mvcc/stage/write/WriteStart.java | 2 +-
.../mvcc/stage/write/WriteUniqueVerify.java | 5 +-
.../collection/rx/CassandraThreadScheduler.java | 2 +-
.../rx/CassandraThreadSchedulerTest.java | 44 +++----
.../persistence/collection/rx/ParallelTest.java | 8 +-
.../usergrid/persistence/graph/GraphFig.java | 14 ++
.../graph/consistency/AsyncProcessor.java | 50 +++++---
.../graph/consistency/AsyncProcessorImpl.java | 60 ++++++---
.../graph/consistency/CompleteListener.java | 14 ++
.../graph/consistency/TimeoutTask.java | 61 +++++++++
.../graph/impl/EdgeDeleteListener.java | 6 +-
.../persistence/graph/impl/EdgeManagerImpl.java | 2 +-
.../graph/impl/EdgeWriteListener.java | 2 +-
.../graph/impl/NodeDeleteListener.java | 6 +-
.../impl/parse/ObservableIterator.java | 18 ++-
.../graph/consistency/AsyncProcessorTest.java | 127 +++++++++++++++----
24 files changed, 337 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index db3ce3a..00db576 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -159,17 +159,17 @@
<!-- RX java -->
<dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-core</artifactId>
- <version>0.16.1</version>
+ <groupId>com.netflix.rxjava</groupId>
+ <artifactId>rxjava-core</artifactId>
+ <version>0.17.0-RC7</version>
</dependency>
- <dependency>
- <groupId>com.netflix.rxjava</groupId>
- <artifactId>rxjava-contrib</artifactId>
- <version>0.14.6</version>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>com.netflix.rxjava</groupId>-->
+ <!--<artifactId>rxjava-contrib</artifactId>-->
+ <!--<version>0.14.6</version>-->
+ <!--</dependency>-->
<dependency>
<groupId>com.google.inject.extensions</groupId>
@@ -201,10 +201,13 @@
<version>${log4j.version}</version>
</dependency>
+ <!--Remove custom build once this patch is complete
+ https://github.com/Netflix/Hystrix/pull/209-->
+
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
- <version>1.3.8</version>
+ <version>1.3.14-SNAPSHOT</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index c87bf33..731933a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -24,7 +24,7 @@ import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
/**
@@ -69,6 +69,6 @@ public class CassandraCommand<R> extends HystrixCommand<R> {
*/
private static <R> Observable<R> toObservable( R readValue ) {
//create a new command and ensure it's observed on the correct thread scheduler
- return new CassandraCommand<R>( readValue ).toObservable( Schedulers.threadPoolForIO() );
+ return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 4391d09..ef95a75 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -47,9 +47,9 @@ import com.google.inject.assistedinject.Assisted;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.Func2;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.functions.FuncN;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9c1a432..06c875e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -41,7 +41,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
index 4b0fae5..e718f4b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkStart.java
@@ -45,7 +45,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
index 20abc5a..e0d0f77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/Load.java
@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index bcec305..4c90e69 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -41,7 +41,7 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import org.apache.usergrid.persistence.model.field.Field;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
index e42019c..7a70dfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerify.java
@@ -8,7 +8,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index 01fee57..a0ff1b1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -24,7 +24,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 55280c8..fe53b13 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -40,8 +40,8 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
/**
@@ -168,6 +168,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
};
+
return Observable.zip( fields, zipFunction );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
index 90f3b2d..a5baf76 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
@@ -35,7 +35,7 @@ import com.google.inject.Provider;
import com.google.inject.name.Named;
import rx.Scheduler;
-import rx.concurrency.Schedulers;
+import rx.schedulers.Schedulers;
public class CassandraThreadScheduler implements Provider<Scheduler> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
index 45a175e..cf399c3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
@@ -18,6 +18,7 @@ import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import com.google.inject.Inject;
import rx.Scheduler;
+import rx.functions.Action1;
import rx.util.functions.Action0;
import static org.junit.Assert.assertTrue;
@@ -63,13 +64,11 @@ public class CassandraThreadSchedulerTest {
//schedule and we should fail
try {
-
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
- //no op
- }
- } );
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
+ }
+ });
fail( "This should have thrown an exception" );
}
@@ -127,9 +126,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -158,9 +157,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -215,9 +214,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -244,9 +243,9 @@ public class CassandraThreadSchedulerTest {
try {
- rxScheduler.schedule( new Action0() {
- @Override
- public void call() {
+ rxScheduler.schedule( new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
//no op
}
} );
@@ -295,9 +294,10 @@ public class CassandraThreadSchedulerTest {
final CountDownLatch latch = new CountDownLatch( totalCount );
for ( int i = 0; i < totalCount; i++ ) {
- final Action0 action = new Action0() {
- @Override
- public void call() {
+
+ final Action1<Scheduler.Inner> action = new Action1<Scheduler.Inner>() {
+ @Override
+ public void call( final Scheduler.Inner inner ) {
try {
final String threadName = Thread.currentThread().getName();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index ac252fb..434ea26 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -38,9 +38,9 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Scheduler;
-import rx.concurrency.Schedulers;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
+import rx.functions.Func1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -82,7 +82,7 @@ public class ParallelTest {
// final Scheduler scheduler = Schedulers.threadPoolForComputation();
//use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
- final Scheduler scheduler = Schedulers.threadPoolForIO();
+ final Scheduler scheduler = Schedulers.io();
//set our size equal
ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index bbf83be..3f87d14 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -15,6 +15,11 @@ public interface GraphFig extends GuicyFig {
public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+
+ public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
+
+ public static final String TIMEOUT_TASK_TIME = "usergrid.graph.timeout.task.time";
+
public static final String READ_CL = "usergrid.graph.read.cl";
public static final String WRITE_CL = "usergrid.graph.write.cl";
@@ -37,6 +42,15 @@ public interface GraphFig extends GuicyFig {
@Key( WRITE_TIMEOUT )
long getWriteTimeout();
+ @Default( "100" )
+ @Key( TIMEOUT_SIZE )
+ int getTimeoutReadSize();
+
+ @Default("500")
+ @Key( TIMEOUT_TASK_TIME )
+ long getTaskLoopTime();
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 150aa6b..016b45d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -1,48 +1,62 @@
package org.apache.usergrid.persistence.graph.consistency;
+import java.util.Collection;
+
+
/**
- * Used to fork lazy repair and other types of operations.
- *
+ * Used to fork lazy repair and other types of operations.
*/
public interface AsyncProcessor<T> {
/**
- * The processor implementation is responsible for guaranteeing the events fire in the runtime environment.
- * This could be local or clustered, consult the documentation on the implementation. Note that events published
- * here could possibly be double published if the operation reaches it's timeout before completion. As a result, every
- * receiver of the event should operate in an idempotent way. Note that the event will fire at a time >= the timeout time.
- * Firing immediately should not be assumed.
+ * The processor implementation is responsible for guaranteeing the events fire in the runtime environment. This
+ * could be local or clustered, consult the documentation on the implementation. Note that events published here
+ * could possibly be double published if the operation reaches it's timeout before completion. As a result, every
+ * receiver of the event should operate in an idempotent way. Note that the event will fire at a time >= the
+ * timeout time. Firing immediately should not be assumed.
*
* @param event The event to be scheduled for verification
- * @param timeout The time in milliseconds we should wait before the event should fire
+ * @param timeout The time in milliseconds we should wait before the event should fire
*/
public AsynchronousMessage<T> setVerification( T event, long timeout );
/**
- * Start processing the event immediately asynchronously. In the event an exception is thrown, the AsynchronousMessage should be re-tried.
- * It is up to the implementer to commit the event so that it does not fire again. This should never throw exceptions.
+ * Start processing the event immediately asynchronously. In the event an exception is thrown, the
+ * AsynchronousMessage should be re-tried. It is up to the implementer to commit the event so that it does not fire
+ * again. This should never throw exceptions.
*
* @param event The event to start
*/
- public void start(AsynchronousMessage<T> event);
+ public void start( AsynchronousMessage<T> event );
+
+
+ /**
+ * Get all events that have passed their timeout
+ *
+ * @param maxCount The maximum count
+ * @param timeout The timeout to set when retrieving these timeouts to ensure they aren't lost
+ *
+ * @return A collection of asynchronous messages that have passed their timeout. This could be due to process
+ * failure node loss etc. No assumptions regarding the state of the message should be assumed when they are
+ * returned.
+ */
+ public Collection<AsynchronousMessage<T>> getTimeouts( int maxCount, long timeout );
/**
* Add the error listener to the list of listeners
- * @param listener
*/
public void addErrorListener( ErrorListener<T> listener );
/**
* Add the listener to this instance
- * @param listener
*/
- public void addListener(MessageListener<T, T> listener);
-
-
-
-
+ public void addListener( MessageListener<T, T> listener );
+ /**
+ * Add a complete listener that is invoked when the listener has been invoked
+ */
+ public void addCompleteListener( CompleteListener<T> listener );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 20c40e2..d962a88 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -2,45 +2,59 @@ package org.apache.usergrid.persistence.graph.consistency;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.graph.GraphFig;
+
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Action0;
-import rx.util.functions.Action1;
-import rx.util.functions.FuncN;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
/**
- * The implementation of asynchronous processing.
- * This is intentionally kept as a 1 processor to 1 event type mapping
+ * The implementation of asynchronous processing. This is intentionally kept as a 1 processor to 1 event type mapping
* This way reflection is not used, event dispatching is easier, and has compile time checking
*/
@Singleton
public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
- private static final HystrixCommandGroupKey GRAPH_REPAIR = HystrixCommandGroupKey.Factory.asKey( "Graph_Repair" );
-
- private final TimeoutQueue<T> queue;
- private final Scheduler scheduler;
- private final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+ /**
+ * TODO, run this with hystrix
+ */
private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
- private List<ErrorListener> errorListeners = new ArrayList<ErrorListener>();
+ protected final TimeoutQueue<T> queue;
+ protected final Scheduler scheduler;
+ protected final GraphFig graphFig;
+ protected final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
+
+
+ protected List<ErrorListener<T>> errorListeners = new ArrayList<ErrorListener<T>>();
+ protected List<CompleteListener<T>> completeListeners = new ArrayList<CompleteListener<T>>();
@Inject
- public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler ) {
+ public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler, final GraphFig graphFig ) {
this.queue = queue;
this.scheduler = scheduler;
+ this.graphFig = graphFig;
+
+ //we purposefully use a new thread. We don't want to use one of the I/O threads to run this task
+ //in the event the scheduler is full, we'll end up rejecting the reschedule of this task
+ Schedulers.newThread().schedulePeriodically( new TimeoutTask<T>(this, graphFig), graphFig.getTaskLoopTime(), graphFig.getTaskLoopTime(), TimeUnit.MILLISECONDS );
}
@@ -52,8 +66,6 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
public void start( final AsynchronousMessage<T> event ) {
-
-
final T data = event.getEvent();
/**
* Execute all listeners in parallel
@@ -83,6 +95,10 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
public void call() {
queue.remove( event );
+
+ for ( CompleteListener<T> listener : completeListeners ) {
+ listener.onComplete( event );
+ }
}
} ).subscribe( new Action1<AsynchronousMessage<T>>() {
@Override
@@ -94,6 +110,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
+ public Collection<AsynchronousMessage<T>> getTimeouts( final int maxCount, final long timeout ) {
+ return queue.take( maxCount, timeout );
+ }
+
+
+ @Override
public void addListener( final MessageListener<T, T> listener ) {
this.listeners.add( listener );
}
@@ -105,4 +127,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
public void addErrorListener( ErrorListener<T> listener ) {
this.errorListeners.add( listener );
}
+
+
+ @Override
+ public void addCompleteListener( final CompleteListener<T> listener ) {
+ this.completeListeners.add( listener );
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
new file mode 100644
index 0000000..c415005
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/CompleteListener.java
@@ -0,0 +1,14 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Internal listener for errors, really only used for testing. Can be used to hook into error state
+ */
+public interface CompleteListener<T> {
+
+ /**
+ * Invoked when an event is complete
+ * @param event
+ */
+ void onComplete( AsynchronousMessage<T> event );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
new file mode 100644
index 0000000..6607724
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutTask.java
@@ -0,0 +1,61 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.graph.GraphFig;
+
+import rx.Scheduler;
+import rx.functions.Action1;
+
+
+/**
+ *
+ *
+ */
+public class TimeoutTask<T> implements Action1<Scheduler.Inner> {
+
+ private final AsyncProcessor<T> processor;
+ private final GraphFig graphFig;
+
+
+ public TimeoutTask( final AsyncProcessor<T> processor, final GraphFig graphFig ) {
+ this.processor = processor;
+ this.graphFig = graphFig;
+ }
+
+
+ @Override
+ public void call( final Scheduler.Inner inner ) {
+
+ /**
+ * We purposefully loop through a tight loop. If we have anything to process, we need to do so
+ * Once we run out of items to process, this thread will sleep and the timer will fire
+ */
+ while(!inner.isUnsubscribed()) {
+
+ Iterator<AsynchronousMessage<T>> timeouts = getTimeouts();
+
+ /**
+ * We're done, just exit
+ */
+ if(!timeouts.hasNext()){
+ return;
+ }
+
+ while ( timeouts.hasNext() ) {
+ processor.start( timeouts.next() );
+ }
+
+ }
+ }
+
+
+ /**
+ * Get the timeouts
+ * @return
+ */
+ private Iterator<AsynchronousMessage<T>> getTimeouts() {
+ return processor.getTimeouts( graphFig.getTimeoutReadSize(), graphFig.getWriteTimeout() * 2 ).iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index 824b05c..6feded3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -23,9 +23,9 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
-import rx.util.functions.Func5;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func5;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 663215a..b84bd8e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -56,7 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index 428085e..271375a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -22,7 +22,7 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.util.functions.Func1;
+import rx.functions.Func1;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 08cc942..aabc572 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -20,9 +20,10 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import com.google.inject.Inject;
+import rx.Notification;
import rx.Observable;
-import rx.util.functions.Action1;
-import rx.util.functions.Func1;
+import rx.functions.Action1;
+import rx.functions.Func1;
/**
@@ -75,6 +76,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEv
// delete them. We might want to batch this up for efficiency
return loadEdgesToTarget( scope,
new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+
.doOnEach( new Action1<MarkedEdge>() {
@Override
public void call( final MarkedEdge markedEdge ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 58ff6c7..2274868 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -5,6 +5,7 @@ import java.util.Iterator;
import rx.Observable;
import rx.Observer;
+import rx.Subscriber;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
@@ -14,11 +15,11 @@ import rx.subscriptions.Subscriptions;
* This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O. This allows
* us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
*/
-public abstract class ObservableIterator<T> implements Observable.OnSubscribeFunc<T> {
+public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
@Override
- public Subscription onSubscribe( final Observer<? super T> observer ) {
+ public void call( final Subscriber<? super T> subscriber ) {
try {
@@ -26,25 +27,22 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribeFun
Iterator<T> itr = getIterator();
- //TODO T.N. when > 0.17 comes out, we need to implement the check with each loop as described here https://github.com/Netflix/RxJava/issues/802
- while ( itr.hasNext()) {
- observer.onNext( itr.next() );
+ //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
+ while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
+ subscriber.onNext( itr.next() );
}
- observer.onCompleted();
+ subscriber.onCompleted();
}
//if any error occurs, we need to notify the observer so it can perform it's own error handling
catch ( Throwable t ) {
- observer.onError( t );
+ subscriber.onError( t );
}
- return Subscriptions.empty();
}
-
-
/**
* Return the iterator to feed data to
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/51a9ffd9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index fc53705..9822cc5 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.graph.consistency;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
@@ -29,7 +31,7 @@ import org.mockito.stubbing.Answer;
import rx.Observable;
import rx.concurrency.Schedulers;
-import rx.util.functions.Action1;
+import rx.functions.Action1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -70,7 +72,7 @@ public class AsyncProcessorTest {
final TimeoutQueue queue = mock( TimeoutQueue.class );
- AsyncProcessor asyncProcessor = constructProcessor( queue, null );
+ AsyncProcessor asyncProcessor = constructProcessor( queue );
//mock up the queue
@@ -89,6 +91,8 @@ public class AsyncProcessorTest {
final TestListener listener = new TestListener();
+
+
final TestEvent event = new TestEvent();
@@ -108,9 +112,17 @@ public class AsyncProcessorTest {
final TimeoutQueue queue = mock( TimeoutQueue.class );
- final AsyncProcessor asyncProcessor = constructProcessor( queue, listener );
+ final AsyncProcessor asyncProcessor = constructProcessor( queue );
+
+ asyncProcessor.addListener( listener );
+
+
+ final CountDownLatch latch = new CountDownLatch( 2 );
+
+ final TestCompleteListener completeListener = new TestCompleteListener(latch);
+
+ asyncProcessor.addCompleteListener( completeListener );
- final CountDownLatch latch = new CountDownLatch( 1 );
//mock up the ack to allow us to block the test until the async confirm fires
when( queue.remove( asynchronousMessage ) ).thenAnswer( new Answer<Boolean>() {
@@ -132,11 +144,15 @@ public class AsyncProcessorTest {
final TestEvent firedEvent = listener.events.peek();
assertSame( event, firedEvent );
+
+ final TestEvent completeEvent = completeListener.events.peek();
+
+ assertSame(event, completeEvent);
}
-// @Test( timeout = 5000 )
- @Test
+ // @Test( timeout = 5000 )
+ @Test
public void verifyErrorExecution() throws InterruptedException {
final AsynchronousErrorListener listener = new AsynchronousErrorListener();
@@ -163,7 +179,9 @@ public class AsyncProcessorTest {
final TimeoutQueue queue = mock( TimeoutQueue.class );
- final AsyncProcessorImpl asyncProcessor = constructProcessor( queue, listener );
+ final AsyncProcessorImpl asyncProcessor = constructProcessor( queue );
+
+ asyncProcessor.addListener( listener );
final CountDownLatch latch = new CountDownLatch( 1 );
@@ -177,7 +195,6 @@ public class AsyncProcessorTest {
invoked[1] = true;
latch.countDown();
}
-
} );
//throw an error if remove is called. This shouldn't happen
@@ -209,13 +226,56 @@ public class AsyncProcessorTest {
}
+ @Test
+ public void verifyTimeout() {
+
+
+ final long timeout = 500;
+ final TestEvent event = new TestEvent();
+
+
+ final AsynchronousMessage<TestEvent> asynchronousMessage = new AsynchronousMessage<TestEvent>() {
+ @Override
+ public TestEvent getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+ };
+
+ final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+ when(queue.take( 1, 10000l )).thenReturn( Collections.singletonList(asynchronousMessage ));
+
+ AsyncProcessor<TestEvent> processor = constructProcessor( queue );
+
+
+ Collection<AsynchronousMessage<TestEvent>> timeouts = processor.getTimeouts( 1, 10000l );
+
+ assertEquals(1, timeouts.size());
+
+ AsynchronousMessage<TestEvent> returned = timeouts.iterator().next();
+
+ assertSame(asynchronousMessage, returned);
+
+
+
+ }
+
+
+
/**
* Construct the async processor
*/
- public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , MessageListener<T, T> listener) {
+ public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue ) {
+
+ AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.threadPoolForIO() );
- AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
- processor.addListener( listener );
return processor;
}
@@ -245,12 +305,29 @@ public class AsyncProcessorTest {
@Override
public Observable<TestEvent> receive( final TestEvent event ) {
- return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
- @Override
- public void call( final TestEvent testEvent ) {
- events.push( testEvent );
- }
- });
+ return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+ @Override
+ public void call( final TestEvent testEvent ) {
+ events.push( testEvent );
+ }
+ } );
+ }
+ }
+
+
+ public static class TestCompleteListener implements CompleteListener<TestEvent> {
+
+ private final CountDownLatch latch;
+ private final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+ public TestCompleteListener( final CountDownLatch latch ) {this.latch = latch;}
+
+
+ @Override
+ public void onComplete( final AsynchronousMessage<TestEvent> event ) {
+ events.push( event.getEvent() );
+ latch.countDown();
}
}
@@ -265,15 +342,13 @@ public class AsyncProcessorTest {
@Override
public Observable<TestEvent> receive( final TestEvent event ) {
- return Observable.from( event ).doOnEach(new Action1<TestEvent>() {
- @Override
- public void call( final TestEvent testEvent ) {
- events.push( testEvent );
- throw new RuntimeException( "Test Exception thrown. Failed to process event" );
- }
- });
+ return Observable.from( event ).doOnEach( new Action1<TestEvent>() {
+ @Override
+ public void call( final TestEvent testEvent ) {
+ events.push( testEvent );
+ throw new RuntimeException( "Test Exception thrown. Failed to process event" );
+ }
+ } );
}
-
-
}
}
[2/3] git commit: Initial refactor of listeners and event system
Posted by sn...@apache.org.
Initial refactor of listeners and event system
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c45f7ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c45f7ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c45f7ee9
Branch: refs/heads/asyncqueue
Commit: c45f7ee9d7e625fbe7ad89ffc42441a3453ecad0
Parents: da4630f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 7 12:07:35 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 7 12:07:35 2014 -0700
----------------------------------------------------------------------
.../graph/consistency/AsyncProcessor.java | 3 +-
.../graph/consistency/AsyncProcessorImpl.java | 6 +-
.../graph/consistency/ErrorListener.java | 5 +
.../graph/consistency/MessageListener.java | 4 +-
.../consistency/SimpleAsynchronousMessage.java | 3 +-
.../graph/impl/EdgeDeleteListener.java | 199 ++++++++++++++
.../persistence/graph/impl/EdgeEvent.java | 31 +++
.../persistence/graph/impl/EdgeManagerImpl.java | 261 -------------------
.../graph/impl/EdgeWriteListener.java | 103 ++++++++
.../graph/impl/NodeDeleteListener.java | 128 +++++++++
10 files changed, 476 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index f6013d9..150aa6b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -2,8 +2,7 @@ package org.apache.usergrid.persistence.graph.consistency;
/**
- * Used to fork lazy repair and other types of operations. This can be implemented
- * across multiple environments.
+ * Used to fork lazy repair and other types of operations.
*
*/
public interface AsyncProcessor<T> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index fbac644..20c40e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -19,7 +19,9 @@ import rx.util.functions.FuncN;
/**
- * The implementation of asynchronous processing
+ * The implementation of asynchronous processing.
+ * This is intentionally kept as a 1 processor to 1 event type mapping
+ * This way reflection is not used, event dispatching is easier, and has compile time checking
*/
@Singleton
public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@@ -84,7 +86,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
}
} ).subscribe( new Action1<AsynchronousMessage<T>>() {
@Override
- public void call( final AsynchronousMessage<T> tAsynchronousMessage ) {
+ public void call( final AsynchronousMessage<T> asynchronousMessage ) {
//To change body of implemented methods use File | Settings | File Templates.
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
index f736cdb..9d21304 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -6,5 +6,10 @@ package org.apache.usergrid.persistence.graph.consistency;
*/
public interface ErrorListener <T> {
+ /**
+ * Invoked when an error occurs during asynchronous processing
+ * @param event
+ * @param t
+ */
void onError( AsynchronousMessage<T> event, Throwable t );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
index 8a21dbb..466e4ed 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/MessageListener.java
@@ -1,6 +1,8 @@
package org.apache.usergrid.persistence.graph.consistency;
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
import rx.Observable;
@@ -17,6 +19,6 @@ public interface MessageListener<T, R> {
* @param event The input event
* @return The observable that performs the operations
*/
- Observable<T> receive(T event);
+ Observable<T> receive(final T event);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
index 1e7a04b..0a8651c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchronousMessage.java
@@ -2,7 +2,8 @@ package org.apache.usergrid.persistence.graph.consistency;
/**
- *
+ * Simple message that just contains the event and the timeout. More advanced queue implementations
+ * will most likely subclass this class.
*
*/
public class SimpleAsynchronousMessage<T> implements AsynchronousMessage<T> {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
new file mode 100644
index 0000000..824b05c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -0,0 +1,199 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.EdgeManager;
+import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.Func5;
+
+
+/**
+ * Construct the asynchronous delete operation from the listener
+ */
+@Singleton
+public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+
+ private final EdgeSerialization edgeSerialization;
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeManagerFactory edgeManagerFactory;
+ private final Keyspace keyspace;
+
+
+ @Inject
+ public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
+ final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace, @EdgeDelete
+ final AsyncProcessor edgeDelete ) {
+ this.edgeSerialization = edgeSerialization;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeManagerFactory = edgeManagerFactory;
+ this.keyspace = keyspace;
+
+ edgeDelete.addListener( this );
+ }
+
+
+ @Override
+ public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> delete ) {
+
+ final Edge edge = delete.getData();
+ final OrganizationScope scope = delete.getOrganizationScope();
+ final UUID maxVersion = edge.getVersion();
+ final EdgeManager edgeManager = edgeManagerFactory.createEdgeManager( scope );
+
+
+ return Observable.from( edge ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
+ @Override
+ public Observable<MutationBatch> call( final Edge edge ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+
+ //go through every version of this edge <= the current version and remove it
+ Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgeToTarget( scope,
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+ edge.getVersion(), null ) );
+ }
+ } ).doOnEach( new Action1<MarkedEdge>() {
+ @Override
+ public void call( final MarkedEdge markedEdge ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
+ batch.mergeShallow( delete );
+ }
+ } );
+
+
+ //search by edge type and target type. If any other edges with this target type exist,
+ // we can't delete it
+ Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
+ new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
+ edge.getTargetNode().getType(), null ) ).take( 2 ).count()
+ .doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization
+ .removeEdgeTypeFromSource(
+ scope, edge );
+ batch.mergeShallow( delete );
+ }
+ }
+ } );
+
+
+ Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
+ new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
+ edge.getSourceNode().getType(), null ) ).take( 2 ).count()
+ .doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization
+ .removeEdgeTypeToTarget(
+ scope, edge );
+ batch.mergeShallow( delete );
+ }
+ }
+ } );
+
+
+ //search by edge type and target type. If any other edges with this target type exist,
+ // we can't delete it
+ Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+ .count().doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization.removeEdgeTypeFromSource( scope, edge );
+ }
+ }
+ } );
+
+
+ Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
+ .count().doOnEach( new Action1<Integer>() {
+ @Override
+ public void call( final Integer count ) {
+ //There's nothing to do,
+ // we have 2 different edges with the
+ // same edge type and
+ // target type. Don't delete meta data
+ if ( count == 1 ) {
+ final MutationBatch delete =
+ edgeMetadataSerialization.removeEdgeTypeToTarget( scope, edge );
+ }
+ }
+ } );
+
+
+ //no op, just wait for each observable to populate the mutation before returning it
+ return Observable.zip( edges, sourceIdType, targetIdType, sourceType, targetType,
+ new Func5<MarkedEdge, Integer, Integer, Integer, Integer, MutationBatch>() {
+ @Override
+ public MutationBatch call( final MarkedEdge markedEdge, final Integer integer,
+ final Integer integer2, final Integer integer3,
+ final Integer integer4 ) {
+ return batch;
+ }
+ } );
+ }
+ }
+
+
+ ).map( new Func1<MutationBatch, EdgeEvent<Edge>>() {
+ @Override
+ public EdgeEvent<Edge> call( final MutationBatch mutationBatch ) {
+ try {
+ mutationBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
+
+ return delete;
+ }
+ } );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
new file mode 100644
index 0000000..d4b6f91
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeEvent.java
@@ -0,0 +1,31 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
+
+/**
+ * Get the edge event in the organizational scope
+ *
+ */
+public class EdgeEvent<T> {
+
+ private final OrganizationScope organizationScope;
+ private final T data;
+
+
+ public EdgeEvent( final OrganizationScope organizationScope, final T data ) {
+ this.organizationScope = organizationScope;
+ this.data = data;
+ }
+
+
+ public OrganizationScope getOrganizationScope() {
+ return organizationScope;
+ }
+
+
+ public T getData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 1cc2fce..663215a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
-import org.apache.usergrid.persistence.graph.consistency.MessageListener;
import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
import org.apache.usergrid.persistence.graph.guice.NodeDelete;
@@ -50,7 +49,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
@@ -58,9 +56,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
-import rx.util.functions.Action1;
import rx.util.functions.Func1;
-import rx.util.functions.Func4;
/**
@@ -109,16 +105,10 @@ public class EdgeManagerImpl implements EdgeManager {
this.edgeWriteAsyncProcessor = edgeWrite;
- this.edgeWriteAsyncProcessor.addListener( new EdgeWriteListener() );
-
-
this.edgeDeleteAsyncProcessor = edgeDelete;
- this.edgeDeleteAsyncProcessor.addListener( new EdgeDeleteListener() );
this.nodeDeleteAsyncProcessor = nodeDelete;
-
- this.nodeDeleteAsyncProcessor.addListener( new NodeDeleteListener() );
}
@@ -415,255 +405,4 @@ public class EdgeManagerImpl implements EdgeManager {
return true;
}
}
-
-
- /**
- * Construct the asynchronous edge lister for the repair operation.
- */
- public class EdgeWriteListener implements MessageListener<Edge, Edge> {
-
- @Override
- public Observable<Edge> receive( final Edge write ) {
-
- final UUID maxVersion = write.getVersion();
-
- return Observable.create( new ObservableIterator<MarkedEdge>() {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
-
- final SimpleSearchByEdge search =
- new SimpleSearchByEdge( write.getSourceNode(), write.getType(), write.getTargetNode(),
- maxVersion, null );
-
- return edgeSerialization.getEdgeFromSource( scope, search );
- }
- } ).filter( new Func1<MarkedEdge, Boolean>() {
-
- //TODO, reuse this for delete operation
-
-
- /**
- * We only want to return edges < this version so we remove them
- * @param markedEdge
- * @return
- */
- @Override
- public Boolean call( final MarkedEdge markedEdge ) {
- return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
- }
- //buffer the deletes and issue them in a single mutation
- } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, Edge>() {
- @Override
- public Edge call( final List<MarkedEdge> markedEdges ) {
-
- final int size = markedEdges.size();
-
- final MutationBatch batch = edgeSerialization.deleteEdge( scope, markedEdges.get( 0 ) );
-
- for ( int i = 1; i < size; i++ ) {
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdges.get( i ) );
-
- batch.mergeShallow( delete );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to issue write to cassandra", e );
- }
-
- return write;
- }
- } );
- }
- }
-
-
- /**
- * Construct the asynchronous delete operation from the listener
- */
- public class EdgeDeleteListener implements MessageListener<Edge, Edge> {
-
- @Override
- public Observable<Edge> receive( final Edge delete ) {
-
- final UUID maxVersion = delete.getVersion();
-
- return Observable.from( delete ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
- @Override
- public Observable<MutationBatch> call( final Edge edge ) {
-
- //search by edge type and target type. If any other edges with this target type exist,
- // we can't delete it
- Observable<MutationBatch> sourceIdType = loadEdgesFromSourceByType(
- new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
- edge.getTargetNode().getType(), null ) ).take( 2 ).count()
- .map( new Func1<Integer, MutationBatch>() {
- @Override
- public MutationBatch call( final Integer count ) {
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
- return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
- }
- } );
-
-
- Observable<MutationBatch> targetIdType = loadEdgesToTargetByType(
- new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
- edge.getSourceNode().getType(), null ) ).take( 2 ).count()
- .map( new Func1<Integer, MutationBatch>() {
- @Override
- public MutationBatch call( final Integer count ) {
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
-
- return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
- }
- } );
-
- //search by edge type and target type. If any other edges with this target type exist,
- // we can't delete it
- Observable<MutationBatch> sourceType = loadEdgesFromSource(
- new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) )
- .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
- @Override
- public MutationBatch call( final Integer count ) {
-
-
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
-
- return edgeMetadataSerialization.removeEdgeTypeFromSource( scope, delete );
- }
- } );
-
-
- Observable<MutationBatch> targetType = loadEdgesToTarget(
- new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) )
- .take( 2 ).count().map( new Func1<Integer, MutationBatch>() {
-
-
- @Override
- public MutationBatch call( final Integer count ) {
-
-
- //There's nothing to do, we have 2 different edges with the same edge type and
- // target type. Don't delete meta data
- if ( count == 2 ) {
- return null;
- }
-
- return edgeMetadataSerialization.removeEdgeTypeToTarget( scope, delete );
- }
- } );
-
-
- return Observable.zip( sourceIdType, targetIdType, sourceType, targetType,
- new Func4<MutationBatch, MutationBatch, MutationBatch, MutationBatch, MutationBatch>() {
-
-
- @Override
- public MutationBatch call( final MutationBatch mutationBatch,
- final MutationBatch mutationBatch2,
- final MutationBatch mutationBatch3,
- final MutationBatch mutationBatch4 ) {
-
- return join( join( join( mutationBatch, mutationBatch2 ), mutationBatch3 ),
- mutationBatch4 );
- }
-
-
- private MutationBatch join( MutationBatch first, MutationBatch second ) {
- if ( first == null ) {
- if ( second == null ) {
- return null;
- }
-
- return second;
- }
-
-
- else if ( second == null ) {
- return first;
- }
-
- first.mergeShallow( second );
-
- return first;
- }
- } );
- }
- } ).map( new Func1<MutationBatch, Edge>() {
- @Override
- public Edge call( final MutationBatch mutationBatch ) {
- try {
- mutationBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
-
- return delete;
- }
- } );
- }
- }
-
-
- /**
- * Construct the asynchronous node delete from the q
- */
- public class NodeDeleteListener implements MessageListener<Id, Id> {
-
- @Override
- public Observable<Id> receive( final Id node ) {
-
-
- return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
- @Override
- public Optional<UUID> call( final Id id ) {
- return nodeSerialization.getMaxVersion( scope, node );
- }
- } ).flatMap( new Func1<Optional<UUID>, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final Optional<UUID> uuidOptional ) {
- return getEdgeTypesToTarget( new SimpleSearchEdgeType( node, null ) )
- .flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
-
- //for each edge type, we want to search all edges < this version to the node and
- // delete them. We might want to batch this up for efficiency
- return loadEdgesToTarget(
- new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
- .doOnEach( new Action1<Edge>() {
- @Override
- public void call( final Edge edge ) {
- deleteEdge( edge );
- }
- } );
- }
- } );
- }
- } ).map( new Func1<Edge, Id>() {
- @Override
- public Id call( final Edge edge ) {
- return node;
- }
- } );
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
new file mode 100644
index 0000000..428085e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -0,0 +1,103 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous edge lister for the repair operation.
+ */
+@Singleton
+public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
+
+ private final EdgeSerialization edgeSerialization;
+ private final GraphFig graphFig;
+ private final Keyspace keyspace;
+
+
+ public EdgeWriteListener( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
+ final Keyspace keyspace, @EdgeWrite final AsyncProcessor edgeWrite ) {
+ this.edgeSerialization = edgeSerialization;
+ this.graphFig = graphFig;
+ this.keyspace = keyspace;
+ edgeWrite.addListener( this );
+ }
+
+
+ @Override
+ public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> write ) {
+
+ final Edge edge = write.getData();
+ final OrganizationScope scope = write.getOrganizationScope();
+ final UUID maxVersion = edge.getVersion();
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
+ null );
+
+ return edgeSerialization.getEdgeFromSource( scope, search );
+ }
+ } ).filter( new Func1<MarkedEdge, Boolean>() {
+
+ //TODO, reuse this for delete operation
+
+
+ /**
+ * We only want to return edges < this version so we remove them
+ * @param markedEdge
+ * @return
+ */
+ @Override
+ public Boolean call( final MarkedEdge markedEdge ) {
+ return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+ }
+ //buffer the deletes and issue them in a single mutation
+ } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
+ @Override
+ public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( MarkedEdge edge : markedEdges ) {
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+
+ batch.mergeShallow( delete );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to issue write to cassandra", e );
+ }
+
+ return write;
+ }
+ } );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c45f7ee9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
new file mode 100644
index 0000000..08cc942
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.MessageListener;
+import org.apache.usergrid.persistence.graph.guice.NodeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+
+/**
+ * Construct the asynchronous node delete from the q
+ */
+public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, EdgeEvent<Id>> {
+
+ private final NodeSerialization nodeSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+
+
+ /**
+ * Wire the serialization dependencies
+ */
+ @Inject
+ public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
+ final EdgeMetadataSerialization edgeMetadataSerialization, @NodeDelete final AsyncProcessor nodeDelete) {
+
+
+ this.nodeSerialization = nodeSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ nodeDelete.addListener( this );
+ }
+
+
+ @Override
+ public Observable<EdgeEvent<Id>> receive( final EdgeEvent<Id> edgeEvent ) {
+
+ final Id node = edgeEvent.getData();
+ final OrganizationScope scope = edgeEvent.getOrganizationScope();
+
+
+ return Observable.from( node ).map( new Func1<Id, Optional<UUID>>() {
+ @Override
+ public Optional<UUID> call( final Id id ) {
+ return nodeSerialization.getMaxVersion( scope, node );
+ }
+ } ).flatMap( new Func1<Optional<UUID>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final Optional<UUID> uuidOptional ) {
+
+ return getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+ .flatMap( new Func1<String, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final String edgeType ) {
+
+ //for each edge type, we want to search all edges < this version to the node and
+ // delete them. We might want to batch this up for efficiency
+ return loadEdgesToTarget( scope,
+ new SimpleSearchByEdgeType( node, edgeType, uuidOptional.get(), null ) )
+ .doOnEach( new Action1<MarkedEdge>() {
+ @Override
+ public void call( final MarkedEdge markedEdge ) {
+ edgeSerialization.deleteEdge( scope, markedEdge );
+ }
+ } );
+ }
+ } );
+ }
+ } ).map( new Func1<MarkedEdge, EdgeEvent<Id>>() {
+ @Override
+ public EdgeEvent<Id> call( final MarkedEdge edge ) {
+ return edgeEvent;
+ }
+ } );
+ }
+
+
+ /**
+ * Get all existing edge types to the target node
+ * @param scope
+ * @param search
+ * @return
+ */
+ private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<String>() {
+ @Override
+ protected Iterator<String> getIterator() {
+ return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+ }
+ } );
+ }
+
+
+ /**
+ * Load all edges pointing to this target
+ * @param scope
+ * @param search
+ * @return
+ */
+ private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
+
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesToTarget( scope, search );
+ }
+ } );
+ }
+}