You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:01 UTC

[02/43] git commit: Finished local queue for proof of concept

Finished local queue for proof of concept


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0133aefd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0133aefd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0133aefd

Branch: refs/heads/two-dot-o
Commit: 0133aefd335f1d0bfa10087f834d4ed18b9a23bd
Parents: 3689a3c
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Feb 24 17:57:28 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Feb 24 17:57:28 2014 -0700

----------------------------------------------------------------------
 .../graph/consistency/LocalTimeoutQueue.java    |  98 +++++++++++
 .../consistency/LocalTimeoutQueueTest.java      | 164 +++++++++++++++++++
 2 files changed, 262 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0133aefd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
new file mode 100644
index 0000000..03b7863
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
@@ -0,0 +1,98 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import com.google.inject.Inject;
+
+
+/**
+ * Simple implementation of our timeout queue using an in memory PriorityBlockingQueue.
+ *
+ * This SHOULD NOT be used in a production environment.  This is for development/testing runtimes only.
+ */
+public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
+
+    /**
+     * For in memory queueing
+     */
+    private final PriorityBlockingQueue<TimeoutEvent<T>> queue = new PriorityBlockingQueue<TimeoutEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
+
+    private final TimeService timeService;
+
+
+    @Inject
+    public LocalTimeoutQueue( final TimeService timeService ) {
+        this.timeService = timeService;
+    }
+
+
+    @Override
+    public TimeoutEvent<T> queue( final T event, final long timeout ) {
+        final long scheduledTimeout = timeService.getCurrentTime() + timeout;
+        final TimeoutEvent<T> queuedEvent = new SimpleTimeoutEvent<T>( event, scheduledTimeout );
+
+        queue.add( queuedEvent );
+
+        return queuedEvent;
+    }
+
+
+    @Override
+    public Collection<TimeoutEvent<T>> take( final int maxSize, final long timeout ) {
+
+        final long now = timeService.getCurrentTime();
+        final long newTimeout = now+timeout;
+
+        List<TimeoutEvent<T>> results = new ArrayList<TimeoutEvent<T>>(maxSize);
+
+        for(int i = 0; i < maxSize; i ++){
+
+            TimeoutEvent<T> queuedEvent = queue.peek();
+
+            //nothing to do
+            if(queuedEvent == null){
+                break;
+            }
+
+
+            //not yet reached timeout
+            if(queuedEvent.getTimeout() > now){
+                break;
+            }
+
+            final TimeoutEvent<T> newEvent =  new SimpleTimeoutEvent<T>( queuedEvent.getEvent(), newTimeout );
+
+            //re schedule a new event to replace this one
+            queue.add(newEvent);
+
+            //we're re-added, remove the element
+            queue.poll();
+
+            results.add( newEvent );
+
+        }
+
+        return results;
+    }
+
+
+    @Override
+    public boolean remove( final TimeoutEvent<T> event ) {
+        return queue.remove( event );
+    }
+
+
+    private static class TimeoutEventCompatator<T> implements Comparator<TimeoutEvent<T>> {
+
+
+        @Override
+        public int compare( final TimeoutEvent<T> o1, final TimeoutEvent<T> o2 ) {
+            return Long.compare( o1.getTimeout(), o2.getTimeout() );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0133aefd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
new file mode 100644
index 0000000..f3107d2
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -0,0 +1,164 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Test timeout queue on the local system
+ */
+public class LocalTimeoutQueueTest {
+
+    @Test
+    public void queueReadRemove() {
+
+        TimeService timeService = mock( TimeService.class );
+
+        final long time = 1000l;
+        final long timeout = 1000;
+
+        when( timeService.getCurrentTime() ).thenReturn( time );
+
+        TimeoutQueue<TestEvent> queue = new LocalTimeoutQueue<TestEvent>( timeService );
+
+        final TestEvent event = new TestEvent();
+
+        TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+
+        assertNotNull( timeoutEvent );
+
+        assertEquals( event, timeoutEvent.getEvent() );
+        assertEquals( time + timeout, timeoutEvent.getTimeout() );
+
+        Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+
+        assertEquals( "Time not yet elapsed", 0, results.size() );
+
+        //now elapse the time
+        final long firstTime = time + timeout;
+
+        when( timeService.getCurrentTime() ).thenReturn( firstTime );
+
+        results = queue.take( 100, timeout );
+
+        assertEquals( "Time elapsed", 1, results.size() );
+
+        //validate we get a new timeout event since the old one was re-scheduled
+        Iterator<TimeoutEvent<TestEvent>> events = results.iterator();
+
+        TimeoutEvent<TestEvent> message = events.next();
+
+        assertEquals( event, message.getEvent() );
+
+        assertEquals( firstTime + timeout, message.getTimeout() );
+
+
+        //now remove it
+        queue.remove( message );
+
+        //advance time again (a lot)
+        when( timeService.getCurrentTime() ).thenReturn( firstTime * 20 );
+
+        results = queue.take( 100, timeout );
+
+        assertEquals( "Queue now empty", 0, results.size() );
+    }
+
+
+    @Test
+    public void queueReadTimeout() {
+
+        TimeService timeService = mock( TimeService.class );
+
+        final long time = 1000l;
+        final long timeout = 1000;
+
+        final int queueSize = 1000;
+
+        when( timeService.getCurrentTime() ).thenReturn( time );
+
+        TimeoutQueue<TestEvent> queue = new LocalTimeoutQueue<TestEvent>( timeService );
+
+
+        Set<TestEvent> events = new HashSet<TestEvent>();
+
+        for ( int i = 0; i < queueSize; i++ ) {
+
+            final TestEvent event = new TestEvent();
+
+            TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+
+            events.add( event );
+
+            assertNotNull( timeoutEvent );
+
+            assertEquals( event, timeoutEvent.getEvent() );
+            assertEquals( time + timeout, timeoutEvent.getTimeout() );
+        }
+
+
+        Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+
+        assertEquals( "Time not yet elapsed", 0, results.size() );
+
+        //now elapse the time
+        final long firstTime = time + timeout;
+
+        when( timeService.getCurrentTime() ).thenReturn( firstTime );
+
+
+        final int takeSize = 100;
+
+        final int iterations = queueSize / takeSize;
+
+        for ( int i = 0; i < iterations; i++ ) {
+
+            results = queue.take( takeSize, timeout );
+
+            if ( results.size() == 0 ) {
+                break;
+            }
+
+            assertEquals( "Time elapsed", 100, results.size() );
+
+            //validate we get a new timeout event since the old one was re-scheduled
+            Iterator<TimeoutEvent<TestEvent>> eventIterator = results.iterator();
+
+            while(eventIterator.hasNext()){
+
+                TimeoutEvent<TestEvent> message = eventIterator.next();
+
+                assertTrue( events.remove( message.getEvent() ) );
+
+                assertEquals( firstTime + timeout, message.getTimeout() );
+
+                //remove from our queue
+                boolean removed = queue.remove( message );
+
+                assertTrue( removed );
+            }
+        }
+
+
+        assertEquals( "All elements dequeued", 0, events.size() );
+    }
+
+
+    public static class TestEvent {
+
+        public boolean equals( Object o ) {
+            return this == o;
+        }
+    }
+}