You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/24 18:20:50 UTC
[02/26] git commit: Initial commit of async processing framework.
Need to evaluate it a bit more on the async processing.
Initial commit of async processing framework. Need to evaluate it a bit more on the async processing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1dc3973f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1dc3973f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1dc3973f
Branch: refs/pull/77/merge
Commit: 1dc3973fa9eaeee62c24ad49616e240017c656ab
Parents: 0f5ad33
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Feb 21 16:46:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Feb 21 16:46:11 2014 -0700
----------------------------------------------------------------------
.../graph/consistency/AsyncProcessor.java | 14 +-
.../graph/consistency/AsyncProcessorImpl.java | 94 ++++++-
.../graph/consistency/TimeService.java | 33 +++
.../persistence/graph/guice/GraphModule.java | 7 +-
.../graph/consistency/AsyncProcessorTest.java | 273 +++++++++++++++++++
5 files changed, 408 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 948ef2b..d5efc52 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -13,21 +13,23 @@ public interface AsyncProcessor {
* The processor implementation is responsible for guaranteeing the events fire in the runtime environment.
* This could be local or clustered, consult the documentation on the implementation. Note that events published
* here could possibly be double published if the operation reaches it's timeout before completion. As a result, every
- * receiver of the event should operate in an idempotent way.
+ * receiver of the event should operate in an idempotent way. Note that the event will fire at a time >= the timeout time.
+ * Firing immediately should not be assumed.
*
* @param event The event to be scheduled for verification
- * @param timeout The epoch time in milliseconds the event should fire
+ * @param timeout The time in milliseconds we should wait before the event should fire
*/
- public <T> TimeoutEvent<T> verify(T event, long timeout);
+ public <T> TimeoutEvent<T> setVerification( T event, long timeout );
/**
- * Start processing the event immediately asynchronously.
+ * Start processing the event immediately asynchronously. In the event an exception is thrown, the TimeoutEvent should be re-tried.
+ * It is up to the implementer to commit the event so that it does not fire again. This should never throw exceptions.
*
- * @param event
+ * @param event The event to start
* @param <T>
*/
- public <T> void start(T event);
+ public <T> void start(TimeoutEvent<T> event);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 2ee308a..807b2b4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -1,9 +1,20 @@
package org.apache.usergrid.persistence.graph.consistency;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+
+import rx.Observer;
+import rx.Scheduler;
/**
@@ -12,26 +23,97 @@ import com.google.inject.Singleton;
@Singleton
public class AsyncProcessorImpl implements AsyncProcessor {
+ private static final HystrixCommandGroupKey GRAPH_REPAIR = HystrixCommandGroupKey.Factory.asKey( "Graph_Repair" );
+
private final EventBus bus;
private final TimeoutQueue queue;
+ private final Scheduler scheduler;
+
+ private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
+
+ private List<ErrorListener> listeners = new ArrayList<ErrorListener>();
@Inject
- public AsyncProcessorImpl( final EventBus bus, final TimeoutQueue queue ) {
+ public AsyncProcessorImpl( final EventBus bus, final TimeoutQueue queue, final Scheduler scheduler ) {
this.bus = bus;
this.queue = queue;
+ this.scheduler = scheduler;
}
@Override
- public <T> TimeoutEvent<T> verify( final T event, final long timeout ) {
- return queue.queue( event, timeout );
+ public <T> TimeoutEvent<T> setVerification( final T event, final long timeout ) {
+ return queue.queue( event, timeout );
}
@Override
- public <T> void start( final T event ) {
- //TODO, wrap this in hystrix for timeouts and capacity
- bus.post( event );
+ public <T> void start( final TimeoutEvent<T> event ) {
+
+
+ //run this in a timeout command so it doesn't run forever. If it times out, it will simply resume later
+ new HystrixCommand<Void>( GRAPH_REPAIR ) {
+
+ @Override
+ protected Void run() throws Exception {
+ final T busEvent = event.getEvent();
+ bus.post( busEvent );
+ return null;
+ }
+ }.toObservable( scheduler ).subscribe( new Observer<Void>() {
+ @Override
+ public void onCompleted() {
+ queue.remove( event );
+ }
+
+
+ @Override
+ public void onError( final Throwable throwable ) {
+ LOG.error( "Unable to process async event", throwable );
+
+ for ( ErrorListener listener : listeners ) {
+ listener.onError( event, throwable );
+ }
+ }
+
+
+ @Override
+ public void onNext( final Void args ) {
+ //nothing to do here
+ System.out.print( "next" );
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ } );
+
+ // new Action1<Void>() {
+ // @Override
+ // public void call( final Void timeoutEvent ) {
+ //
+ // }
+ // }, new Action1<Throwable>() {
+ // @Override
+ // public void call( final Throwable throwable ) {
+ //
+ // }
+ // }
+ // );
+ }
+
+
+ /**
+ * Add an error listener
+ */
+ public void addListener( ErrorListener listener ) {
+ this.listeners.add( listener );
+ }
+
+
+ /**
+ * Internal listener for errors, really only used for testing. Can be used to hook into error state
+ */
+ public static interface ErrorListener {
+
+ public <T> void onError( TimeoutEvent<T> event, Throwable t );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java
new file mode 100644
index 0000000..1625550
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Simple time service to get the current system time. Useful in mocking
+ */
+public interface TimeService {
+
+ /**
+ * Get the current time
+ * @return
+ */
+ public long getCurrentTime();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 00b1c41..a2506a3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -28,6 +28,8 @@ import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver
import org.apache.usergrid.persistence.graph.EdgeManager;
import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
@@ -87,11 +89,12 @@ public class GraphModule extends AbstractModule {
migrationBinding.addBinding().to( NodeSerializationImpl.class );
+
/**
* Graph event bus, will need to be refactored into it's own classes
*/
- final EventBus eventBus = new EventBus("asyncCleanup");
+ final EventBus eventBus = new EventBus("AsyncProcessorBus");
bind(EventBus.class).toInstance(eventBus);
//auto register every impl on the event bus
@@ -106,6 +109,8 @@ public class GraphModule extends AbstractModule {
}
});
+ bind(AsyncProcessor.class).to(AsyncProcessorImpl.class);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
new file mode 100644
index 0000000..c58b2c0
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+import rx.concurrency.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+public class AsyncProcessorTest {
+
+
+ @Test
+ public void verificationSchedule() {
+
+
+ final long timeout = 500;
+ final TestEvent event = new TestEvent();
+
+
+ final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+ @Override
+ public TestEvent getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+ };
+
+ final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+ AsyncProcessor asyncProcessor = constructProcessor( null, queue );
+
+
+ //mock up the queue
+ when( queue.queue( event, timeout ) ).thenReturn( timeoutEvent );
+
+
+ TimeoutEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
+
+ //ensure the timeouts are returned from the Queue subsystem
+ assertSame( timeoutEvent, returned );
+ }
+
+
+ @Test( timeout = 5000 )
+ public void verifyAsyncExecution() throws InterruptedException {
+
+ final TestListener listener = new TestListener();
+
+ final EventBus testBus = new EventBus( "test" );
+
+ testBus.register( listener );
+
+
+ final TestEvent event = new TestEvent();
+
+
+ final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+ @Override
+ public TestEvent getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return 500;
+ }
+ };
+
+ final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+ final AsyncProcessor asyncProcessor = constructProcessor( testBus, queue );
+
+ final CountDownLatch latch = new CountDownLatch( 1 );
+
+ //mock up the ack to allow us to block the test until the async confirm fires
+ when( queue.remove( timeoutEvent ) ).thenAnswer( new Answer<Boolean>() {
+ @Override
+ public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
+ latch.countDown();
+ return true;
+ }
+ } );
+
+
+ asyncProcessor.start( timeoutEvent );
+
+
+ //block until the event is fired. The correct invocation is implicitly verified by the remove mock
+
+ latch.await();
+
+ final TestEvent firedEvent = listener.events.peek();
+
+ assertSame( event, firedEvent );
+ }
+
+
+ @Test( timeout = 5000 )
+// @Test
+ public void verifyErrorExecution() throws InterruptedException {
+
+ final ErrorListener listener = new ErrorListener();
+
+ final EventBus testBus = new EventBus( "test" );
+
+ testBus.register( listener );
+
+
+ final TestEvent event = new TestEvent();
+
+ final boolean[] invoked = new boolean[]{false, false};
+
+
+ final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+ @Override
+ public TestEvent getEvent() {
+ return event;
+ }
+
+
+ @Override
+ public long getTimeout() {
+ return 500;
+ }
+ };
+
+ final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+ final AsyncProcessorImpl asyncProcessor = constructProcessor( testBus, queue );
+
+ final CountDownLatch latch = new CountDownLatch( 1 );
+
+ final TimeoutEvent<?>[] errorEvents = { null };
+
+ //countdown the latch so the test can proceed
+ asyncProcessor.addListener( new AsyncProcessorImpl.ErrorListener() {
+ @Override
+ public <T> void onError( final TimeoutEvent<T> event, final Throwable t ) {
+ errorEvents[0] = event;
+ invoked[1] = true;
+ latch.countDown();
+ }
+ } );
+
+ //throw an error if remove is called. This shouldn't happen
+ when( queue.remove( timeoutEvent ) ).then( new Answer<Boolean>() {
+ @Override
+ public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
+ invoked[0] = true;
+ return false;
+ }
+ } );
+
+
+
+ //fire the event
+ asyncProcessor.start( timeoutEvent );
+
+
+ //block until the event is fired. The invocation verification is part of the error listener unlocking
+ latch.await();
+
+ final TestEvent firedEvent = listener.events.peek();
+
+ assertSame( event, firedEvent );
+
+ assertFalse("Queue remove should not be invoked", invoked[0]);
+
+ assertTrue("Error listener should be invoked", invoked[1]);
+
+ assertEquals( event, errorEvents[0] );
+ }
+
+
+ /**
+ * Construct the async processor
+ */
+ public AsyncProcessorImpl constructProcessor( EventBus eventBus, TimeoutQueue queue ) {
+
+ return new AsyncProcessorImpl( eventBus, queue, Schedulers.threadPoolForIO() );
+ }
+
+
+ /**
+ * Marked class for events, does nothing
+ */
+ public static class TestEvent {
+
+ public boolean equals(Object other){
+ return other == this;
+ }
+ }
+
+
+ public static class TestListener {
+
+ public final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+ public TestListener() {
+
+ }
+
+
+ @Subscribe
+ public void fireTestEvent( TestEvent e ) {
+ events.push( e );
+ }
+ }
+
+
+ /**
+ * Throw error after the event is fired
+ */
+ public static class ErrorListener {
+
+ public final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+ @Subscribe
+ public void fireTestEvent( final TestEvent e ) {
+ events.push( e );
+ throw new RuntimeException( "Test Exception thrown. Failed to process event" );
+ }
+ }
+}