You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2014/07/08 01:26:54 UTC

[1/3] git commit: CURATOR-116 - Modified sorting of children to be deterministic.

Repository: curator
Updated Branches:
  refs/heads/CURATOR-116 [created] d062906bc


CURATOR-116 - Modified sorting of children to be deterministic.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1103f476
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1103f476
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1103f476

Branch: refs/heads/CURATOR-116
Commit: 1103f476d84a53784377a96f8661bd9219e7412d
Parents: 6e98562
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Wed Jun 25 11:53:00 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Wed Jun 25 11:53:00 2014 +1000

----------------------------------------------------------------------
 .../recipes/queue/DistributedDelayQueue.java    | 12 +++-
 .../queue/TestDistributedDelayQueue.java        | 67 +++++++++++++++++++-
 2 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1103f476/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index b84471f..bd90e71 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -77,14 +77,22 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
             finalFlushMs
         )
         {
+            @Override
             protected long getDelay(String itemNode)
             {
+                return getDelay(itemNode, System.currentTimeMillis());
+            }
+            
+            protected long getDelay(String itemNode, long sortTime)
+            {               
                 long epoch = getEpoch(itemNode);
-                return epoch - System.currentTimeMillis();
+                return epoch - sortTime;
             }
 
+            @Override
             protected void sortChildren(List<String> children)
             {
+                final long sortTime = System.currentTimeMillis();
                 Collections.sort
                 (
                     children,
@@ -93,7 +101,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
                         @Override
                         public int compare(String o1, String o2)
                         {
-                            long        diff = getDelay(o1) - getDelay(o2);
+                            long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                             return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/curator/blob/1103f476/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
index d6b592f..3759c34 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
@@ -28,6 +28,12 @@ import org.apache.curator.test.Timing;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
@@ -107,7 +113,7 @@ public class TestDistributedDelayQueue extends BaseClassForTests
             queue.start();
 
             Random random = new Random();
-            for ( int i = 0; i < 10; ++i )
+            for ( int i = 0; i < QTY; ++i )
             {
                 long    delay = System.currentTimeMillis() + random.nextInt(100);
                 queue.put(delay, delay);
@@ -128,6 +134,65 @@ public class TestDistributedDelayQueue extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+    
+    @Test
+    public void testSorting() throws Exception
+    {
+        //Need to use a fairly large number to ensure that sorting can take some time.
+        final int QTY = 1000;
+
+        Timing                          timing = new Timing();
+        DistributedDelayQueue<Long>     queue = null;
+        CuratorFramework                client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class));
+            queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test2").putInBackground(false).buildDelayQueue();
+            queue.start();
+            
+            Map<Long, Long> data = new HashMap<Long, Long>();
+            
+            //Make the earliest a second into the future, so we can ensure that everything's
+            //been added prior to the consumption starting. Otherwise it's possible to start
+            //processing entries before they've all been added so the ordering will be 
+            //incorrect.
+            long delay = System.currentTimeMillis() + 5000;            
+            for ( long i = 0; i < QTY; ++i )
+            {
+                data.put(delay, i);
+                
+                //We want to make the elements close together but not exactly the same MS.
+                delay += 1;
+            }
+                       
+            //Randomly sort the list            
+            List<Long> keys = new ArrayList<Long>(data.keySet());
+            Collections.shuffle(keys);
+
+            //Put the messages onto the queue in random order, but with the appropriate
+            //delay and value
+            for ( Long key : keys )
+            {                
+                queue.put(data.get(key), key);
+            }           
+
+            long lastValue = -1;
+            for ( int i = 0; i < QTY; ++i )
+            {
+                Long value = consumer.take(6, TimeUnit.SECONDS);
+                Assert.assertNotNull(value);
+                Assert.assertEquals(value, new Long(lastValue + 1));
+                lastValue = value;
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(queue);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+    
 
     private static class LongSerializer implements QueueSerializer<Long>
     {


[3/3] git commit: Merge branch 'CURATOR-116' of https://github.com/cammckenzie/curator into CURATOR-116

Posted by ca...@apache.org.
Merge branch 'CURATOR-116' of https://github.com/cammckenzie/curator into CURATOR-116


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d062906b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d062906b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d062906b

Branch: refs/heads/CURATOR-116
Commit: d062906bc28bfc66b16ec987e65ba6e558870b24
Parents: 2c0c523 fdc5600
Author: Cam McKenzie <ca...@apache.org>
Authored: Tue Jul 8 08:43:03 2014 +1000
Committer: Cam McKenzie <ca...@apache.org>
Committed: Tue Jul 8 08:43:03 2014 +1000

----------------------------------------------------------------------
 .../recipes/queue/DistributedDelayQueue.java    | 12 +++-
 .../queue/TestDistributedDelayQueue.java        | 67 +++++++++++++++++++-
 2 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: CURATOR-116 - Made the getDelay() method private, as it does not need to be protected.

Posted by ca...@apache.org.
CURATOR-116 - Made the getDelay() method private, as it does not need to
be protected.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fdc56009
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fdc56009
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fdc56009

Branch: refs/heads/CURATOR-116
Commit: fdc5600953c29e0cdcfe16396b27d1b447e13380
Parents: 1103f47
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Thu Jun 26 08:51:58 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Thu Jun 26 08:51:58 2014 +1000

----------------------------------------------------------------------
 .../curator/framework/recipes/queue/DistributedDelayQueue.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fdc56009/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index bd90e71..ff39f42 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -83,7 +83,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
                 return getDelay(itemNode, System.currentTimeMillis());
             }
             
-            protected long getDelay(String itemNode, long sortTime)
+            private long getDelay(String itemNode, long sortTime)
             {               
                 long epoch = getEpoch(itemNode);
                 return epoch - sortTime;