You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:02 UTC
[03/43] git commit: Added basic Q logic for in memory queue as proof
of concept. Need to test and add reserved capacity for writes.
Added basic Q logic for in memory queue as proof of concept. Need to test and add reserved capacity for writes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a586a342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a586a342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a586a342
Branch: refs/heads/two-dot-o
Commit: a586a342e176a20d76db501b36a50feeb8655f16
Parents: 0133aef
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 26 22:01:22 2014 -0800
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 26 22:01:22 2014 -0800
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 3 +
.../usergrid/persistence/graph/EdgeManager.java | 2 +-
.../usergrid/persistence/graph/GraphFig.java | 6 ++
.../graph/consistency/AsyncProcessor.java | 19 +++-
.../graph/consistency/AsyncProcessorImpl.java | 39 +++----
.../graph/consistency/AsynchonrousEvent.java | 18 ++++
.../consistency/AsynchronousEventListener.java | 18 ++++
.../graph/consistency/ErrorListener.java | 10 ++
.../graph/consistency/LocalTimeoutQueue.java | 23 +++--
.../consistency/SimpleAsynchonrousEvent.java | 30 ++++++
.../graph/consistency/SimpleTimeoutEvent.java | 30 ------
.../graph/consistency/TimeoutEvent.java | 18 ----
.../graph/consistency/TimeoutEventListener.java | 18 ----
.../graph/consistency/TimeoutQueue.java | 8 +-
.../persistence/graph/guice/EdgeDelete.java | 37 +++++++
.../persistence/graph/guice/EdgeWrite.java | 37 +++++++
.../persistence/graph/guice/GraphModule.java | 29 +++---
.../persistence/graph/guice/NodeDelete.java | 37 +++++++
.../persistence/graph/impl/EdgeManagerImpl.java | 103 +++++++++++++++++--
.../EdgeMetadataSerialization.java | 2 +-
.../graph/serialization/EdgeSerialization.java | 4 +-
.../graph/serialization/NodeSerialization.java | 2 +-
.../impl/EdgeSerializationImpl.java | 4 +-
.../graph/consistency/AsyncProcessorTest.java | 44 ++++----
.../consistency/LocalTimeoutQueueTest.java | 28 ++---
25 files changed, 399 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index d91a9b3..381c651 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -50,4 +50,7 @@ public interface EntityCollectionManager {
* Load the entity with the given entity Id
*/
public Observable<Entity> load( Id entityId );
+
+
+ //TODO add partial update
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
index 6a6a092..9f9c510 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
@@ -66,7 +66,7 @@ public interface EdgeManager {
* @param edge The edge to delete
*
*
- * Delete the edge. Implementation should also delete the incoming (reversed) edge.
+ * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge.
*/
Observable<Edge> deleteEdge( Edge edge );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index c5a2d5a..bbf83be 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -19,6 +19,8 @@ public interface GraphFig extends GuicyFig {
public static final String WRITE_CL = "usergrid.graph.write.cl";
+ public static final String WRITE_TIMEOUT = "usergrid.graph.write.timeout";
+
@Default( "1000" )
@Key( SCAN_PAGE_SIZE )
int getScanPageSize();
@@ -31,6 +33,10 @@ public interface GraphFig extends GuicyFig {
@Key( WRITE_CL )
String getWriteCL();
+ @Default("10000")
+ @Key( WRITE_TIMEOUT )
+ long getWriteTimeout();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 4ea3ef8..8d603b3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -19,16 +19,29 @@ public interface AsyncProcessor<T> {
* @param event The event to be scheduled for verification
* @param timeout The time in milliseconds we should wait before the event should fire
*/
- public TimeoutEvent<T> setVerification( T event, long timeout );
+ public AsynchonrousEvent<T> setVerification( T event, long timeout );
/**
- * Start processing the event immediately asynchronously. In the event an exception is thrown, the TimeoutEvent should be re-tried.
+ * Start processing the event immediately asynchronously. In the event an exception is thrown, the AsynchonrousEvent should be re-tried.
* It is up to the implementer to commit the event so that it does not fire again. This should never throw exceptions.
*
* @param event The event to start
*/
- public void start(TimeoutEvent<T> event);
+ public void start(AsynchonrousEvent<T> event);
+
+ /**
+ * Add the error listener to the list of listeners
+ * @param listener
+ */
+ public void addErrorListener( ErrorListener<T> listener );
+
+ /**
+ * Add the listener to this instance
+ * @param listener
+ */
+ public void addListener(AsynchronousEventListener<T> listener);
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 031eb87..3a00436 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -7,9 +7,9 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.google.inject.assistedinject.Assisted;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
@@ -27,24 +27,23 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
private final TimeoutQueue<T> queue;
private final Scheduler scheduler;
- private final TimeoutEventListener<T> listener;
+ private final List<AsynchronousEventListener<T>> listeners = new ArrayList<AsynchronousEventListener<T>>( );
private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
- private List<ErrorListener> listeners = new ArrayList<ErrorListener>();
+ private List<ErrorListener> errorListeners = new ArrayList<ErrorListener>();
@Inject
- public AsyncProcessorImpl( final TimeoutQueue<T> queue, final TimeoutEventListener<T> listener, final Scheduler scheduler ) {
+ public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler ) {
this.queue = queue;
- this.listener = listener;
this.scheduler = scheduler;
}
@Override
- public TimeoutEvent<T> setVerification( final T event, final long timeout ) {
+ public AsynchonrousEvent<T> setVerification( final T event, final long timeout ) {
return queue.queue( event, timeout );
}
@@ -52,7 +51,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
- public void start( final TimeoutEvent<T> event ) {
+ public void start( final AsynchonrousEvent<T> event ) {
//run this in a timeout command so it doesn't run forever. If it times out, it will simply resume later
@@ -60,8 +59,12 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Override
protected Void run() throws Exception {
- final T busEvent = event.getEvent();
- listener.receive( busEvent );
+ final T rootEvent = event.getEvent();
+
+ for(AsynchronousEventListener<T> listener: listeners){
+ listener.receive( rootEvent );
+ }
+
return null;
}
}.toObservable( scheduler ).subscribe( new Observer<Void>() {
@@ -75,7 +78,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
public void onError( final Throwable throwable ) {
LOG.error( "Unable to process async event", throwable );
- for ( ErrorListener listener : listeners ) {
+ for ( ErrorListener listener : errorListeners ) {
listener.onError( event, throwable );
}
}
@@ -90,19 +93,19 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
}
- /**
- * Add an error listener
- */
- public void addListener( ErrorListener listener ) {
+ @Override
+ public void addListener( final AsynchronousEventListener<T> listener ) {
this.listeners.add( listener );
}
/**
- * Internal listener for errors, really only used for testing. Can be used to hook into error state
+ * Add an error listeners
*/
- public static interface ErrorListener {
-
- public <T> void onError( TimeoutEvent<T> event, Throwable t );
+ public void addErrorListener( ErrorListener<T> listener ) {
+ this.errorListeners.add( listener );
}
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java
new file mode 100644
index 0000000..6bb3937
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchonrousEvent.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * An interface for a timeout event
+ */
+public interface AsynchonrousEvent<T> {
+
+ /**
+ * @return The event to fire when our timeout is reached
+ */
+ T getEvent();
+
+ /**
+ * @return The time in epoch millis the event will time out
+ */
+ long getTimeout();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
new file mode 100644
index 0000000..428d850
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsynchronousEventListener.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ *
+ *
+ */
+public interface AsynchronousEventListener<T> {
+
+
+ /**
+ * The handler to receive the event. Any exception that is thrown is considered
+ * a failure, and the event will be re-fired.
+ * @param event
+ */
+ void receive(T event);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
new file mode 100644
index 0000000..6c7c360
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/ErrorListener.java
@@ -0,0 +1,10 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Internal listener for errors, really only used for testing. Can be used to hook into error state
+ */
+public interface ErrorListener <T> {
+
+ void onError( AsynchonrousEvent<T> event, Throwable t );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
index 03b7863..6159b93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
@@ -8,19 +8,22 @@ import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
/**
* Simple implementation of our timeout queue using an in memory PriorityBlockingQueue.
*
* This SHOULD NOT be used in a production environment. This is for development/testing runtimes only.
+ * This should not be a singleton, we can have multiple instances of
*/
+@Singleton
public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
/**
* For in memory queueing
*/
- private final PriorityBlockingQueue<TimeoutEvent<T>> queue = new PriorityBlockingQueue<TimeoutEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
+ private final PriorityBlockingQueue<AsynchonrousEvent<T>> queue = new PriorityBlockingQueue<AsynchonrousEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
private final TimeService timeService;
@@ -32,9 +35,9 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
@Override
- public TimeoutEvent<T> queue( final T event, final long timeout ) {
+ public AsynchonrousEvent<T> queue( final T event, final long timeout ) {
final long scheduledTimeout = timeService.getCurrentTime() + timeout;
- final TimeoutEvent<T> queuedEvent = new SimpleTimeoutEvent<T>( event, scheduledTimeout );
+ final AsynchonrousEvent<T> queuedEvent = new SimpleAsynchonrousEvent<T>( event, scheduledTimeout );
queue.add( queuedEvent );
@@ -43,16 +46,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
@Override
- public Collection<TimeoutEvent<T>> take( final int maxSize, final long timeout ) {
+ public Collection<AsynchonrousEvent<T>> take( final int maxSize, final long timeout ) {
final long now = timeService.getCurrentTime();
final long newTimeout = now+timeout;
- List<TimeoutEvent<T>> results = new ArrayList<TimeoutEvent<T>>(maxSize);
+ List<AsynchonrousEvent<T>> results = new ArrayList<AsynchonrousEvent<T>>(maxSize);
for(int i = 0; i < maxSize; i ++){
- TimeoutEvent<T> queuedEvent = queue.peek();
+ AsynchonrousEvent<T> queuedEvent = queue.peek();
//nothing to do
if(queuedEvent == null){
@@ -65,7 +68,7 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
break;
}
- final TimeoutEvent<T> newEvent = new SimpleTimeoutEvent<T>( queuedEvent.getEvent(), newTimeout );
+ final AsynchonrousEvent<T> newEvent = new SimpleAsynchonrousEvent<T>( queuedEvent.getEvent(), newTimeout );
//re schedule a new event to replace this one
queue.add(newEvent);
@@ -82,16 +85,16 @@ public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
@Override
- public boolean remove( final TimeoutEvent<T> event ) {
+ public boolean remove( final AsynchonrousEvent<T> event ) {
return queue.remove( event );
}
- private static class TimeoutEventCompatator<T> implements Comparator<TimeoutEvent<T>> {
+ private static class TimeoutEventCompatator<T> implements Comparator<AsynchonrousEvent<T>> {
@Override
- public int compare( final TimeoutEvent<T> o1, final TimeoutEvent<T> o2 ) {
+ public int compare( final AsynchonrousEvent<T> o1, final AsynchonrousEvent<T> o2 ) {
return Long.compare( o1.getTimeout(), o2.getTimeout() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java
new file mode 100644
index 0000000..4f1de81
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleAsynchonrousEvent.java
@@ -0,0 +1,30 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ *
+ *
+ */
+public class SimpleAsynchonrousEvent<T> implements AsynchonrousEvent<T> {
+
+ private final T event;
+ private final long timeout;
+
+
+ public SimpleAsynchonrousEvent( final T event, final long timeout ) {
+ this.event = event;
+ this.timeout = timeout;
+ }
+
+
+ @Override
+ public T getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java
deleted file mode 100644
index 73ef653..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/SimpleTimeoutEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- *
- *
- */
-public class SimpleTimeoutEvent<T> implements TimeoutEvent<T> {
-
- private final T event;
- private final long timeout;
-
-
- public SimpleTimeoutEvent( final T event, final long timeout ) {
- this.event = event;
- this.timeout = timeout;
- }
-
-
- @Override
- public T getEvent() {
- return event;
- }
-
-
- @Override
- public long getTimeout() {
- return timeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java
deleted file mode 100644
index c482730..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEvent.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- * An interface for a timeout event
- */
-public interface TimeoutEvent<T> {
-
- /**
- * @return The event to fire when our timeout is reached
- */
- T getEvent();
-
- /**
- * @return The time in epoch millis the event will time out
- */
- long getTimeout();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java
deleted file mode 100644
index 0bd60e7..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutEventListener.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.usergrid.persistence.graph.consistency;
-
-
-/**
- *
- *
- */
-public interface TimeoutEventListener<T> {
-
-
- /**
- * The handler to receive the event. Any exception that is thrown is considered
- * a failure, and the event will be re-fired.
- * @param event
- */
- void receive(T event);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
index 3cb5001..2d16960 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeoutQueue.java
@@ -14,9 +14,9 @@ public interface TimeoutQueue<T> {
*
* @param event The event to queue
* @param timeout The timeout to set on the queue element
- * @return The TimeoutEvent that has been queued
+ * @return The AsynchonrousEvent that has been queued
*/
- public TimeoutEvent<T> queue( T event, long timeout );
+ public AsynchonrousEvent<T> queue( T event, long timeout );
/**
@@ -29,7 +29,7 @@ public interface TimeoutQueue<T> {
*
* @return A collection of events.
*/
- public Collection<TimeoutEvent<T>> take( int maxSize, long timeout );
+ public Collection<AsynchonrousEvent<T>> take( int maxSize, long timeout );
/**
@@ -39,5 +39,5 @@ public interface TimeoutQueue<T> {
*
* @return True if the element was removed. False otherwise
*/
- public boolean remove( TimeoutEvent<T> event );
+ public boolean remove( AsynchonrousEvent<T> event );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java
new file mode 100644
index 0000000..adf9157
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeDelete.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.guice;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface EdgeDelete {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
new file mode 100644
index 0000000..b8afb13
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.guice;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface EdgeWrite {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index fd56fcc..ed9448b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -30,6 +30,8 @@ import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
+import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
+import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
@@ -43,6 +45,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializatio
import com.google.common.eventbus.EventBus;
import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.matcher.Matchers;
@@ -94,23 +97,19 @@ public class GraphModule extends AbstractModule {
* Graph event bus, will need to be refactored into it's own classes
*/
-// final EventBus eventBus = new EventBus("AsyncProcessorBus");
-// bind(EventBus.class).toInstance(eventBus);
+ // create a guice factor for getting our collection manager
- //auto register every impl on the event bus
-// bindListener( Matchers.any(), new TypeListener() {
-// @Override
-// public <I> void hear(@SuppressWarnings("unused") final TypeLiteral<I> typeLiteral, final TypeEncounter<I> typeEncounter) {
-// typeEncounter.register(new InjectionListener<I>() {
-// @Override public void afterInjection(final I instance) {
-// eventBus.register(instance);
-// }
-// });
-// }
-// });
+ //local queue. Need to
+ bind(TimeoutQueue.class).to( LocalTimeoutQueue.class );
+
+ bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
+ bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
+ bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
+ }
- bind(AsyncProcessor.class).to(AsyncProcessorImpl.class);
- }
}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java
new file mode 100644
index 0000000..eadbb1e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/NodeDelete.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.guice;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface NodeDelete {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index 03d6390..6d49591 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -20,11 +20,9 @@
package org.apache.usergrid.persistence.graph.impl;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.OrganizationScope;
@@ -37,6 +35,9 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchByIdType;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.AsynchonrousEvent;
+import org.apache.usergrid.persistence.graph.consistency.AsynchronousEventListener;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
@@ -50,6 +51,7 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.graph.guice.*;
import rx.Observable;
import rx.Scheduler;
@@ -74,13 +76,27 @@ public class EdgeManagerImpl implements EdgeManager {
private final NodeSerialization nodeSerialization;
+ private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
+ private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
+ private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
+
private final GraphFig graphFig;
@Inject
public EdgeManagerImpl( final Scheduler scheduler, final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
- final GraphFig graphFig, @Assisted final OrganizationScope scope ) {
+ final GraphFig graphFig,
+ @EdgeWrite final AsyncProcessor edgeWrite,
+ @EdgeDelete final AsyncProcessor edgeDelete,
+ @NodeDelete final AsyncProcessor nodeDelete,
+
+
+ @Assisted final OrganizationScope scope ) {
+ ValidationUtils.validateOrganizationScope( scope );
+
+
+ this.scope = scope;
this.scheduler = scheduler;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeSerialization = edgeSerialization;
@@ -88,10 +104,34 @@ public class EdgeManagerImpl implements EdgeManager {
this.graphFig = graphFig;
- ValidationUtils.validateOrganizationScope( scope );
+ this.edgeWriteAsyncProcessor = edgeWrite;
- this.scope = scope;
+ this.edgeWriteAsyncProcessor.addListener( new AsynchronousEventListener<Edge>() {
+ @Override
+ public void receive( final Edge edge ) {
+ repairEdgeAsync( edge );
+ }
+ } );
+
+
+ this.edgeDeleteAsyncProcessor = edgeDelete;
+
+ this.edgeDeleteAsyncProcessor.addListener( new AsynchronousEventListener<Edge>() {
+ @Override
+ public void receive( final Edge edge ) {
+ deleteEdgeAsync( edge );
+ }
+ } );
+
+ this.nodeDeleteAsyncProcessor = nodeDelete;
+
+ this.nodeDeleteAsyncProcessor.addListener( new AsynchronousEventListener<Id>() {
+ @Override
+ public void receive( final Id event ) {
+ deleteNodeAsync( event );
+ }
+ } );
}
@@ -108,6 +148,9 @@ public class EdgeManagerImpl implements EdgeManager {
mutation.mergeShallow( edgeMutation );
+ final AsynchonrousEvent<Edge> event =
+ edgeWriteAsyncProcessor.setVerification( edge , getTimeout() );
+
try {
mutation.execute();
}
@@ -115,6 +158,8 @@ public class EdgeManagerImpl implements EdgeManager {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
+ edgeWriteAsyncProcessor.start( event );
+
return edge;
}
} );
@@ -130,6 +175,10 @@ public class EdgeManagerImpl implements EdgeManager {
public Edge call( final Edge edge ) {
final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
+ final AsynchonrousEvent<Edge> event =
+ edgeDeleteAsyncProcessor.setVerification( edge , getTimeout() );
+
+
try {
edgeMutation.execute();
}
@@ -137,6 +186,9 @@ public class EdgeManagerImpl implements EdgeManager {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
+ edgeDeleteAsyncProcessor.start( event );
+
+
return edge;
}
} );
@@ -156,6 +208,10 @@ public class EdgeManagerImpl implements EdgeManager {
final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
+ final AsynchonrousEvent<Id> event =
+ nodeDeleteAsyncProcessor.setVerification( node , getTimeout() );
+
+
try {
nodeMutation.execute();
}
@@ -163,6 +219,8 @@ public class EdgeManagerImpl implements EdgeManager {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
+ nodeDeleteAsyncProcessor.start( event );
+
return id;
}
} );
@@ -290,6 +348,14 @@ public class EdgeManagerImpl implements EdgeManager {
/**
+ * Get our timeout for write consistency
+ */
+ private long getTimeout() {
+ return graphFig.getWriteTimeout() * 2;
+ }
+
+
+ /**
* Helper filter to perform mapping and return an observable of pre-filtered edges
*/
private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
@@ -303,12 +369,11 @@ public class EdgeManagerImpl implements EdgeManager {
/**
- * Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids
- * These are then used in conjunction with the max version filter to filter any edges that should
- * not be returned
- * @param markedEdges
- * @return An observable that emits only edges that can be consumed. There could be multiple versions
- * of the same edge so those need de-duped.
+ * Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids These are then
+ * used in conjunction with the max version filter to filter any edges that should not be returned
+ *
+ * @return An observable that emits only edges that can be consumed. There could be multiple versions of the
+ * same edge so those need de-duped.
*/
@Override
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -368,4 +433,20 @@ public class EdgeManagerImpl implements EdgeManager {
return true;
}
}
+
+
+ public void repairEdgeAsync( Edge write ) {
+
+ }
+
+
+ public void deleteEdgeAsync( Edge delete ) {
+
+ }
+
+
+ public void deleteNodeAsync( Id delete ) {
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
index bb36bff..8806b92 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMetadataSerialization.java
@@ -37,7 +37,7 @@ public interface EdgeMetadataSerialization {
/**
- * Write both the source--->Target edge and the target <----- source edge into the mutation
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
*/
MutationBatch writeEdge( OrganizationScope scope, Edge edge );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
index 1575121..548d821 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerialization.java
@@ -39,7 +39,7 @@ public interface EdgeSerialization {
/**
- * Write both the source--->Target edge and the target <----- source edge into the mutation
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
*
* @param scope The org scope of the graph
* @param edge The edge to write
@@ -56,7 +56,7 @@ public interface EdgeSerialization {
MutationBatch markEdge( OrganizationScope scope, Edge edge);
/**
- * Write both the source -->target edge and the target<--- source edge into the mutation
+ * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
*
* @param scope The org scope of the graph
* @param edge The edge to write
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
index 9853bce..396cd58 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/NodeSerialization.java
@@ -50,7 +50,7 @@ public interface NodeSerialization {
/**
- * Delete the mark entry, signaling a delete is complete
+ * EdgeDelete the mark entry, signaling a delete is complete
* @param scope
* @param node
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4691973..1dbfbae 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -193,7 +193,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
- * Write the edges internally
+ * EdgeWrite the edges internally
* @param scope The scope to encapsulate
* @param edge The edge to write
* @param op The row operation to invoke
@@ -227,7 +227,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
/**
- * Write edges from target<-source
+ * EdgeWrite edges from target<-source
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index b6ef52e..c041e91 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -21,16 +21,12 @@ package org.apache.usergrid.persistence.graph.consistency;
import java.util.Stack;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-
import rx.concurrency.Schedulers;
import static org.junit.Assert.assertEquals;
@@ -56,7 +52,7 @@ public class AsyncProcessorTest {
final TestEvent event = new TestEvent();
- final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+ final AsynchonrousEvent<TestEvent> asynchonrousEvent = new AsynchonrousEvent<TestEvent>() {
@Override
public TestEvent getEvent() {
return event;
@@ -76,13 +72,13 @@ public class AsyncProcessorTest {
//mock up the queue
- when( queue.queue( event, timeout ) ).thenReturn( timeoutEvent );
+ when( queue.queue( event, timeout ) ).thenReturn( asynchonrousEvent );
- TimeoutEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
+ AsynchonrousEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
//ensure the timeouts are returned from the Queue subsystem
- assertSame( timeoutEvent, returned );
+ assertSame( asynchonrousEvent, returned );
}
@@ -94,7 +90,7 @@ public class AsyncProcessorTest {
final TestEvent event = new TestEvent();
- final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+ final AsynchonrousEvent<TestEvent> asynchonrousEvent = new AsynchonrousEvent<TestEvent>() {
@Override
public TestEvent getEvent() {
return event;
@@ -115,7 +111,7 @@ public class AsyncProcessorTest {
final CountDownLatch latch = new CountDownLatch( 1 );
//mock up the ack to allow us to block the test until the async confirm fires
- when( queue.remove( timeoutEvent ) ).thenAnswer( new Answer<Boolean>() {
+ when( queue.remove( asynchonrousEvent ) ).thenAnswer( new Answer<Boolean>() {
@Override
public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
latch.countDown();
@@ -124,7 +120,7 @@ public class AsyncProcessorTest {
} );
- asyncProcessor.start( timeoutEvent );
+ asyncProcessor.start( asynchonrousEvent );
//block until the event is fired. The correct invocation is implicitly verified by the remove mock
@@ -141,7 +137,7 @@ public class AsyncProcessorTest {
@Test
public void verifyErrorExecution() throws InterruptedException {
- final ErrorListener listener = new ErrorListener();
+ final AsynchronousErrorListener listener = new AsynchronousErrorListener();
final TestEvent event = new TestEvent();
@@ -149,7 +145,7 @@ public class AsyncProcessorTest {
final boolean[] invoked = new boolean[] { false, false };
- final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+ final AsynchonrousEvent<TestEvent> asynchonrousEvent = new AsynchonrousEvent<TestEvent>() {
@Override
public TestEvent getEvent() {
return event;
@@ -169,20 +165,21 @@ public class AsyncProcessorTest {
final CountDownLatch latch = new CountDownLatch( 1 );
- final TimeoutEvent<?>[] errorEvents = { null };
+ final AsynchonrousEvent<?>[] errorEvents = { null };
//countdown the latch so the test can proceed
- asyncProcessor.addListener( new AsyncProcessorImpl.ErrorListener() {
+ asyncProcessor.addErrorListener( new ErrorListener<TestEvent>() {
@Override
- public <T> void onError( final TimeoutEvent<T> event, final Throwable t ) {
+ public void onError( final AsynchonrousEvent<TestEvent> event, final Throwable t ) {
errorEvents[0] = event;
invoked[1] = true;
latch.countDown();
}
+
} );
//throw an error if remove is called. This shouldn't happen
- when( queue.remove( timeoutEvent ) ).then( new Answer<Boolean>() {
+ when( queue.remove( asynchonrousEvent ) ).then( new Answer<Boolean>() {
@Override
public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
invoked[0] = true;
@@ -192,7 +189,7 @@ public class AsyncProcessorTest {
//fire the event
- asyncProcessor.start( timeoutEvent );
+ asyncProcessor.start( asynchonrousEvent );
//block until the event is fired. The invocation verification is part of the error listener unlocking
@@ -213,9 +210,12 @@ public class AsyncProcessorTest {
/**
* Construct the async processor
*/
- public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , TimeoutEventListener<T> listener) {
+ public <T> AsyncProcessorImpl<T> constructProcessor( TimeoutQueue<T> queue , AsynchronousEventListener<T> listener) {
+
+ AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue,Schedulers.threadPoolForIO() );
+ processor.addListener( listener );
- return new AsyncProcessorImpl( queue, listener, Schedulers.threadPoolForIO() );
+ return processor;
}
@@ -230,7 +230,7 @@ public class AsyncProcessorTest {
}
- public static class TestListener implements TimeoutEventListener<TestEvent> {
+ public static class TestListener implements AsynchronousEventListener<TestEvent> {
public final Stack<TestEvent> events = new Stack<TestEvent>();
@@ -249,7 +249,7 @@ public class AsyncProcessorTest {
/**
* Throw error after the event is fired
*/
- public static class ErrorListener implements TimeoutEventListener<TestEvent> {
+ public static class AsynchronousErrorListener implements AsynchronousEventListener<TestEvent> {
public final Stack<TestEvent> events = new Stack<TestEvent>();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a586a342/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
index f3107d2..0b741c8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -34,14 +34,14 @@ public class LocalTimeoutQueueTest {
final TestEvent event = new TestEvent();
- TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+ AsynchonrousEvent<TestEvent> asynchonrousEvent = queue.queue( event, timeout );
- assertNotNull( timeoutEvent );
+ assertNotNull( asynchonrousEvent );
- assertEquals( event, timeoutEvent.getEvent() );
- assertEquals( time + timeout, timeoutEvent.getTimeout() );
+ assertEquals( event, asynchonrousEvent.getEvent() );
+ assertEquals( time + timeout, asynchonrousEvent.getTimeout() );
- Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+ Collection<AsynchonrousEvent<TestEvent>> results = queue.take( 100, timeout );
assertEquals( "Time not yet elapsed", 0, results.size() );
@@ -55,9 +55,9 @@ public class LocalTimeoutQueueTest {
assertEquals( "Time elapsed", 1, results.size() );
//validate we get a new timeout event since the old one was re-scheduled
- Iterator<TimeoutEvent<TestEvent>> events = results.iterator();
+ Iterator<AsynchonrousEvent<TestEvent>> events = results.iterator();
- TimeoutEvent<TestEvent> message = events.next();
+ AsynchonrousEvent<TestEvent> message = events.next();
assertEquals( event, message.getEvent() );
@@ -97,18 +97,18 @@ public class LocalTimeoutQueueTest {
final TestEvent event = new TestEvent();
- TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+ AsynchonrousEvent<TestEvent> asynchonrousEvent = queue.queue( event, timeout );
events.add( event );
- assertNotNull( timeoutEvent );
+ assertNotNull( asynchonrousEvent );
- assertEquals( event, timeoutEvent.getEvent() );
- assertEquals( time + timeout, timeoutEvent.getTimeout() );
+ assertEquals( event, asynchonrousEvent.getEvent() );
+ assertEquals( time + timeout, asynchonrousEvent.getTimeout() );
}
- Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+ Collection<AsynchonrousEvent<TestEvent>> results = queue.take( 100, timeout );
assertEquals( "Time not yet elapsed", 0, results.size() );
@@ -133,11 +133,11 @@ public class LocalTimeoutQueueTest {
assertEquals( "Time elapsed", 100, results.size() );
//validate we get a new timeout event since the old one was re-scheduled
- Iterator<TimeoutEvent<TestEvent>> eventIterator = results.iterator();
+ Iterator<AsynchonrousEvent<TestEvent>> eventIterator = results.iterator();
while(eventIterator.hasNext()){
- TimeoutEvent<TestEvent> message = eventIterator.next();
+ AsynchonrousEvent<TestEvent> message = eventIterator.next();
assertTrue( events.remove( message.getEvent() ) );