You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/24 18:20:50 UTC

[02/26] git commit: Initial commit of async processing framework. Need to evaluate it a bit more on the async processing.

Initial commit of async processing framework.  Need to evaluate it a bit more on the async processing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1dc3973f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1dc3973f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1dc3973f

Branch: refs/pull/77/merge
Commit: 1dc3973fa9eaeee62c24ad49616e240017c656ab
Parents: 0f5ad33
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Feb 21 16:46:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Feb 21 16:46:11 2014 -0700

----------------------------------------------------------------------
 .../graph/consistency/AsyncProcessor.java       |  14 +-
 .../graph/consistency/AsyncProcessorImpl.java   |  94 ++++++-
 .../graph/consistency/TimeService.java          |  33 +++
 .../persistence/graph/guice/GraphModule.java    |   7 +-
 .../graph/consistency/AsyncProcessorTest.java   | 273 +++++++++++++++++++
 5 files changed, 408 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
index 948ef2b..d5efc52 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessor.java
@@ -13,21 +13,23 @@ public interface AsyncProcessor {
      * The processor implementation is responsible for guaranteeing the events fire in the runtime environment.
      * This could be local or clustered, consult the documentation on the implementation.  Note that events published
      * here could possibly be double published if the operation reaches it's timeout before completion.  As a result, every
-     * receiver of the event should operate in an idempotent way.
+     * receiver of the event should operate in an idempotent way.  Note that the event will fire at a time >= the timeout time.
+     * Firing immediately should not be assumed.
      *
      * @param event The event to be scheduled for verification
-     * @param timeout  The epoch time in milliseconds the event should fire
+     * @param timeout  The time in milliseconds we should wait before the event should fire
      */
-    public <T> TimeoutEvent<T> verify(T event, long timeout);
+    public <T> TimeoutEvent<T> setVerification( T event, long timeout );
 
 
     /**
-     * Start processing the event immediately asynchronously.
+     * Start processing the event immediately asynchronously.  In the event an exception is thrown, the TimeoutEvent should be re-tried.
+     * It is up to the implementer to commit the event so that it does not fire again.  This should never throw exceptions.
      *
-     * @param event
+     * @param event The event to start
      * @param <T>
      */
-    public <T> void start(T event);
+    public <T> void start(TimeoutEvent<T> event);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 2ee308a..807b2b4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -1,9 +1,20 @@
 package org.apache.usergrid.persistence.graph.consistency;
 
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+
+import rx.Observer;
+import rx.Scheduler;
 
 
 /**
@@ -12,26 +23,97 @@ import com.google.inject.Singleton;
 @Singleton
 public class AsyncProcessorImpl implements AsyncProcessor {
 
+    private static final HystrixCommandGroupKey GRAPH_REPAIR = HystrixCommandGroupKey.Factory.asKey( "Graph_Repair" );
+
     private final EventBus bus;
     private final TimeoutQueue queue;
+    private final Scheduler scheduler;
+
+    private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
+
+    private List<ErrorListener> listeners = new ArrayList<ErrorListener>();
 
 
     @Inject
-    public AsyncProcessorImpl( final EventBus bus, final TimeoutQueue queue ) {
+    public AsyncProcessorImpl( final EventBus bus, final TimeoutQueue queue, final Scheduler scheduler ) {
         this.bus = bus;
         this.queue = queue;
+        this.scheduler = scheduler;
     }
 
 
     @Override
-    public <T> TimeoutEvent<T> verify( final T event, final long timeout ) {
-        return queue.queue( event, timeout  );
+    public <T> TimeoutEvent<T> setVerification( final T event, final long timeout ) {
+        return queue.queue( event, timeout );
     }
 
 
     @Override
-    public <T> void start( final T event ) {
-        //TODO, wrap this in hystrix for timeouts and capacity
-        bus.post( event );
+    public <T> void start( final TimeoutEvent<T> event ) {
+
+
+        //run this in a timeout command so it doesn't run forever. If it times out, it will simply resume later
+        new HystrixCommand<Void>( GRAPH_REPAIR ) {
+
+            @Override
+            protected Void run() throws Exception {
+                final T busEvent = event.getEvent();
+                bus.post( busEvent );
+                return null;
+            }
+        }.toObservable( scheduler ).subscribe( new Observer<Void>() {
+            @Override
+            public void onCompleted() {
+                queue.remove( event );
+            }
+
+
+            @Override
+            public void onError( final Throwable throwable ) {
+                LOG.error( "Unable to process async event", throwable );
+
+                for ( ErrorListener listener : listeners ) {
+                    listener.onError( event, throwable );
+                }
+            }
+
+
+            @Override
+            public void onNext( final Void args ) {
+                //nothing to do here
+                System.out.print( "next" );
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+        } );
+
+        //                new Action1<Void>() {
+        //                                                   @Override
+        //                                                   public void call( final Void timeoutEvent ) {
+        //
+        //                                                   }
+        //                                               }, new Action1<Throwable>() {
+        //                                                   @Override
+        //                                                   public void call( final Throwable throwable ) {
+        //
+        //                                                   }
+        //                                               }
+        //                                             );
+    }
+
+
+    /**
+     * Add an error listener
+     */
+    public void addListener( ErrorListener listener ) {
+        this.listeners.add( listener );
+    }
+
+
+    /**
+     * Internal listener for errors, really only used for testing.  Can be used to hook into error state
+     */
+    public static interface ErrorListener {
+
+        public <T> void onError( TimeoutEvent<T> event, Throwable t );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java
new file mode 100644
index 0000000..1625550
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/TimeService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+/**
+ * Simple time service to get the current system time.  Useful in mocking
+ */
+public interface TimeService {
+
+    /**
+     * Get the current time
+     * @return
+     */
+    public long getCurrentTime();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 00b1c41..a2506a3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -28,6 +28,8 @@ import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver
 import org.apache.usergrid.persistence.graph.EdgeManager;
 import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
 import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
 import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
 import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
@@ -87,11 +89,12 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( NodeSerializationImpl.class );
 
 
+
         /**
          * Graph event bus, will need to be refactored into it's own classes
          */
 
-        final EventBus eventBus = new EventBus("asyncCleanup");
+        final EventBus eventBus = new EventBus("AsyncProcessorBus");
         bind(EventBus.class).toInstance(eventBus);
 
         //auto register every impl on the event bus
@@ -106,6 +109,8 @@ public class GraphModule extends AbstractModule {
            }
         });
 
+        bind(AsyncProcessor.class).to(AsyncProcessorImpl.class);
+
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1dc3973f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
new file mode 100644
index 0000000..c58b2c0
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+import rx.concurrency.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ *
+ *
+ */
+public class AsyncProcessorTest {
+
+
+    @Test
+    public void verificationSchedule() {
+
+
+        final long timeout = 500;
+        final TestEvent event = new TestEvent();
+
+
+        final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+            @Override
+            public TestEvent getEvent() {
+                return event;
+            }
+
+
+            @Override
+            public long getTimeout() {
+                return timeout;
+            }
+        };
+
+        final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+        AsyncProcessor asyncProcessor = constructProcessor( null, queue );
+
+
+        //mock up the queue
+        when( queue.queue( event, timeout ) ).thenReturn( timeoutEvent );
+
+
+        TimeoutEvent<TestEvent> returned = asyncProcessor.setVerification( event, timeout );
+
+        //ensure the timeouts are returned from the Queue subsystem
+        assertSame( timeoutEvent, returned );
+    }
+
+
+    @Test( timeout = 5000 )
+    public void verifyAsyncExecution() throws InterruptedException {
+
+        final TestListener listener = new TestListener();
+
+        final EventBus testBus = new EventBus( "test" );
+
+        testBus.register( listener );
+
+
+        final TestEvent event = new TestEvent();
+
+
+        final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+            @Override
+            public TestEvent getEvent() {
+                return event;
+            }
+
+
+            @Override
+            public long getTimeout() {
+                return 500;
+            }
+        };
+
+        final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+        final AsyncProcessor asyncProcessor = constructProcessor( testBus, queue );
+
+        final CountDownLatch latch = new CountDownLatch( 1 );
+
+        //mock up the ack to allow us to block the test until the async confirm fires
+        when( queue.remove( timeoutEvent ) ).thenAnswer( new Answer<Boolean>() {
+            @Override
+            public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
+                latch.countDown();
+                return true;
+            }
+        } );
+
+
+        asyncProcessor.start( timeoutEvent );
+
+
+        //block until the event is fired.  The correct invocation is implicitly verified by the remove mock
+
+        latch.await();
+
+        final TestEvent firedEvent = listener.events.peek();
+
+        assertSame( event, firedEvent );
+    }
+
+
+    @Test( timeout = 5000 )
+//    @Test
+    public void verifyErrorExecution() throws InterruptedException {
+
+        final ErrorListener listener = new ErrorListener();
+
+        final EventBus testBus = new EventBus( "test" );
+
+        testBus.register( listener );
+
+
+        final TestEvent event = new TestEvent();
+
+        final boolean[] invoked = new boolean[]{false, false};
+
+
+        final TimeoutEvent<TestEvent> timeoutEvent = new TimeoutEvent<TestEvent>() {
+            @Override
+            public TestEvent getEvent() {
+                return event;
+            }
+
+
+            @Override
+            public long getTimeout() {
+                return 500;
+            }
+        };
+
+        final TimeoutQueue queue = mock( TimeoutQueue.class );
+
+
+        final AsyncProcessorImpl asyncProcessor = constructProcessor( testBus, queue );
+
+        final CountDownLatch latch = new CountDownLatch( 1 );
+
+        final TimeoutEvent<?>[] errorEvents = { null };
+
+        //countdown the latch so the test can proceed
+        asyncProcessor.addListener( new AsyncProcessorImpl.ErrorListener() {
+            @Override
+            public <T> void onError( final TimeoutEvent<T> event, final Throwable t ) {
+                errorEvents[0] = event;
+                invoked[1] = true;
+                latch.countDown();
+            }
+        } );
+
+        //throw an error if remove is called.  This shouldn't happen
+        when( queue.remove( timeoutEvent ) ).then( new Answer<Boolean>() {
+            @Override
+            public Boolean answer( final InvocationOnMock invocation ) throws Throwable {
+                invoked[0] = true;
+                return false;
+            }
+        } );
+
+
+
+        //fire the event
+        asyncProcessor.start( timeoutEvent );
+
+
+        //block until the event is fired.  The invocation verification is part of the error listener unlocking
+        latch.await();
+
+        final TestEvent firedEvent = listener.events.peek();
+
+        assertSame( event, firedEvent );
+
+        assertFalse("Queue remove should not be invoked", invoked[0]);
+
+        assertTrue("Error listener should be invoked", invoked[1]);
+
+        assertEquals( event, errorEvents[0] );
+    }
+
+
+    /**
+     * Construct the async processor
+     */
+    public AsyncProcessorImpl constructProcessor( EventBus eventBus, TimeoutQueue queue ) {
+
+        return new AsyncProcessorImpl( eventBus, queue, Schedulers.threadPoolForIO() );
+    }
+
+
+    /**
+     * Marked class for events, does nothing
+     */
+    public static class TestEvent {
+
+        public boolean equals(Object other){
+            return other == this;
+        }
+    }
+
+
+    public static class TestListener {
+
+        public final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+        public TestListener() {
+
+        }
+
+
+        @Subscribe
+        public void fireTestEvent( TestEvent e ) {
+            events.push( e );
+        }
+    }
+
+
+    /**
+     * Throw error after the event is fired
+     */
+    public static class ErrorListener  {
+
+        public final Stack<TestEvent> events = new Stack<TestEvent>();
+
+
+        @Subscribe
+        public void fireTestEvent( final TestEvent e ) {
+            events.push( e );
+            throw new RuntimeException( "Test Exception thrown.  Failed to process event" );
+        }
+    }
+}