You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:01 UTC
[02/43] git commit: Finished local queue for proof of concept
Finished local queue for proof of concept
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0133aefd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0133aefd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0133aefd
Branch: refs/heads/two-dot-o
Commit: 0133aefd335f1d0bfa10087f834d4ed18b9a23bd
Parents: 3689a3c
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Feb 24 17:57:28 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Feb 24 17:57:28 2014 -0700
----------------------------------------------------------------------
.../graph/consistency/LocalTimeoutQueue.java | 98 +++++++++++
.../consistency/LocalTimeoutQueueTest.java | 164 +++++++++++++++++++
2 files changed, 262 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0133aefd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
new file mode 100644
index 0000000..03b7863
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueue.java
@@ -0,0 +1,98 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import com.google.inject.Inject;
+
+
+/**
+ * Simple implementation of our timeout queue using an in memory PriorityBlockingQueue.
+ *
+ * This SHOULD NOT be used in a production environment. This is for development/testing runtimes only.
+ */
+public class LocalTimeoutQueue<T> implements TimeoutQueue<T> {
+
+ /**
+ * For in memory queueing
+ */
+ private final PriorityBlockingQueue<TimeoutEvent<T>> queue = new PriorityBlockingQueue<TimeoutEvent<T>>( 1000, new TimeoutEventCompatator<T>() );
+
+ private final TimeService timeService;
+
+
+ @Inject
+ public LocalTimeoutQueue( final TimeService timeService ) {
+ this.timeService = timeService;
+ }
+
+
+ @Override
+ public TimeoutEvent<T> queue( final T event, final long timeout ) {
+ final long scheduledTimeout = timeService.getCurrentTime() + timeout;
+ final TimeoutEvent<T> queuedEvent = new SimpleTimeoutEvent<T>( event, scheduledTimeout );
+
+ queue.add( queuedEvent );
+
+ return queuedEvent;
+ }
+
+
+ @Override
+ public Collection<TimeoutEvent<T>> take( final int maxSize, final long timeout ) {
+
+ final long now = timeService.getCurrentTime();
+ final long newTimeout = now+timeout;
+
+ List<TimeoutEvent<T>> results = new ArrayList<TimeoutEvent<T>>(maxSize);
+
+ for(int i = 0; i < maxSize; i ++){
+
+ TimeoutEvent<T> queuedEvent = queue.peek();
+
+ //nothing to do
+ if(queuedEvent == null){
+ break;
+ }
+
+
+ //not yet reached timeout
+ if(queuedEvent.getTimeout() > now){
+ break;
+ }
+
+ final TimeoutEvent<T> newEvent = new SimpleTimeoutEvent<T>( queuedEvent.getEvent(), newTimeout );
+
+ //re schedule a new event to replace this one
+ queue.add(newEvent);
+
+ //we're re-added, remove the element
+ queue.poll();
+
+ results.add( newEvent );
+
+ }
+
+ return results;
+ }
+
+
+ @Override
+ public boolean remove( final TimeoutEvent<T> event ) {
+ return queue.remove( event );
+ }
+
+
+ private static class TimeoutEventCompatator<T> implements Comparator<TimeoutEvent<T>> {
+
+
+ @Override
+ public int compare( final TimeoutEvent<T> o1, final TimeoutEvent<T> o2 ) {
+ return Long.compare( o1.getTimeout(), o2.getTimeout() );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0133aefd/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
new file mode 100644
index 0000000..f3107d2
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/LocalTimeoutQueueTest.java
@@ -0,0 +1,164 @@
+package org.apache.usergrid.persistence.graph.consistency;
+
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Test timeout queue on the local system
+ */
+public class LocalTimeoutQueueTest {
+
+ @Test
+ public void queueReadRemove() {
+
+ TimeService timeService = mock( TimeService.class );
+
+ final long time = 1000l;
+ final long timeout = 1000;
+
+ when( timeService.getCurrentTime() ).thenReturn( time );
+
+ TimeoutQueue<TestEvent> queue = new LocalTimeoutQueue<TestEvent>( timeService );
+
+ final TestEvent event = new TestEvent();
+
+ TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+
+ assertNotNull( timeoutEvent );
+
+ assertEquals( event, timeoutEvent.getEvent() );
+ assertEquals( time + timeout, timeoutEvent.getTimeout() );
+
+ Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+
+ assertEquals( "Time not yet elapsed", 0, results.size() );
+
+ //now elapse the time
+ final long firstTime = time + timeout;
+
+ when( timeService.getCurrentTime() ).thenReturn( firstTime );
+
+ results = queue.take( 100, timeout );
+
+ assertEquals( "Time elapsed", 1, results.size() );
+
+ //validate we get a new timeout event since the old one was re-scheduled
+ Iterator<TimeoutEvent<TestEvent>> events = results.iterator();
+
+ TimeoutEvent<TestEvent> message = events.next();
+
+ assertEquals( event, message.getEvent() );
+
+ assertEquals( firstTime + timeout, message.getTimeout() );
+
+
+ //now remove it
+ queue.remove( message );
+
+ //advance time again (a lot)
+ when( timeService.getCurrentTime() ).thenReturn( firstTime * 20 );
+
+ results = queue.take( 100, timeout );
+
+ assertEquals( "Queue now empty", 0, results.size() );
+ }
+
+
+ @Test
+ public void queueReadTimeout() {
+
+ TimeService timeService = mock( TimeService.class );
+
+ final long time = 1000l;
+ final long timeout = 1000;
+
+ final int queueSize = 1000;
+
+ when( timeService.getCurrentTime() ).thenReturn( time );
+
+ TimeoutQueue<TestEvent> queue = new LocalTimeoutQueue<TestEvent>( timeService );
+
+
+ Set<TestEvent> events = new HashSet<TestEvent>();
+
+ for ( int i = 0; i < queueSize; i++ ) {
+
+ final TestEvent event = new TestEvent();
+
+ TimeoutEvent<TestEvent> timeoutEvent = queue.queue( event, timeout );
+
+ events.add( event );
+
+ assertNotNull( timeoutEvent );
+
+ assertEquals( event, timeoutEvent.getEvent() );
+ assertEquals( time + timeout, timeoutEvent.getTimeout() );
+ }
+
+
+ Collection<TimeoutEvent<TestEvent>> results = queue.take( 100, timeout );
+
+ assertEquals( "Time not yet elapsed", 0, results.size() );
+
+ //now elapse the time
+ final long firstTime = time + timeout;
+
+ when( timeService.getCurrentTime() ).thenReturn( firstTime );
+
+
+ final int takeSize = 100;
+
+ final int iterations = queueSize / takeSize;
+
+ for ( int i = 0; i < iterations; i++ ) {
+
+ results = queue.take( takeSize, timeout );
+
+ if ( results.size() == 0 ) {
+ break;
+ }
+
+ assertEquals( "Time elapsed", 100, results.size() );
+
+ //validate we get a new timeout event since the old one was re-scheduled
+ Iterator<TimeoutEvent<TestEvent>> eventIterator = results.iterator();
+
+ while(eventIterator.hasNext()){
+
+ TimeoutEvent<TestEvent> message = eventIterator.next();
+
+ assertTrue( events.remove( message.getEvent() ) );
+
+ assertEquals( firstTime + timeout, message.getTimeout() );
+
+ //remove from our queue
+ boolean removed = queue.remove( message );
+
+ assertTrue( removed );
+ }
+ }
+
+
+ assertEquals( "All elements dequeued", 0, events.size() );
+ }
+
+
+ public static class TestEvent {
+
+ public boolean equals( Object o ) {
+ return this == o;
+ }
+ }
+}