You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2014/07/08 01:26:54 UTC
[1/3] git commit: CURATOR-116 - Modified sorting of children to be
deterministic.
Repository: curator
Updated Branches:
refs/heads/CURATOR-116 [created] d062906bc
CURATOR-116 - Modified sorting of children to be deterministic.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1103f476
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1103f476
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1103f476
Branch: refs/heads/CURATOR-116
Commit: 1103f476d84a53784377a96f8661bd9219e7412d
Parents: 6e98562
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Wed Jun 25 11:53:00 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Wed Jun 25 11:53:00 2014 +1000
----------------------------------------------------------------------
.../recipes/queue/DistributedDelayQueue.java | 12 +++-
.../queue/TestDistributedDelayQueue.java | 67 +++++++++++++++++++-
2 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1103f476/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index b84471f..bd90e71 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -77,14 +77,22 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
finalFlushMs
)
{
+ @Override
protected long getDelay(String itemNode)
{
+ return getDelay(itemNode, System.currentTimeMillis());
+ }
+
+ protected long getDelay(String itemNode, long sortTime)
+ {
long epoch = getEpoch(itemNode);
- return epoch - System.currentTimeMillis();
+ return epoch - sortTime;
}
+ @Override
protected void sortChildren(List<String> children)
{
+ final long sortTime = System.currentTimeMillis();
Collections.sort
(
children,
@@ -93,7 +101,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
@Override
public int compare(String o1, String o2)
{
- long diff = getDelay(o1) - getDelay(o2);
+ long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1103f476/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
index d6b592f..3759c34 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedDelayQueue.java
@@ -28,6 +28,12 @@ import org.apache.curator.test.Timing;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@@ -107,7 +113,7 @@ public class TestDistributedDelayQueue extends BaseClassForTests
queue.start();
Random random = new Random();
- for ( int i = 0; i < 10; ++i )
+ for ( int i = 0; i < QTY; ++i )
{
long delay = System.currentTimeMillis() + random.nextInt(100);
queue.put(delay, delay);
@@ -128,6 +134,65 @@ public class TestDistributedDelayQueue extends BaseClassForTests
CloseableUtils.closeQuietly(client);
}
}
+
+ @Test
+ public void testSorting() throws Exception
+ {
+ //Need to use a fairly large number to ensure that sorting can take some time.
+ final int QTY = 1000;
+
+ Timing timing = new Timing();
+ DistributedDelayQueue<Long> queue = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ BlockingQueueConsumer<Long> consumer = new BlockingQueueConsumer<Long>(Mockito.mock(ConnectionStateListener.class));
+ queue = QueueBuilder.builder(client, consumer, new LongSerializer(), "/test2").putInBackground(false).buildDelayQueue();
+ queue.start();
+
+ Map<Long, Long> data = new HashMap<Long, Long>();
+
+ //Make the earliest a second into the future, so we can ensure that everything's
+ //been added prior to the consumption starting. Otherwise it's possible to start
+ //processing entries before they've all been added so the ordering will be
+ //incorrect.
+ long delay = System.currentTimeMillis() + 5000;
+ for ( long i = 0; i < QTY; ++i )
+ {
+ data.put(delay, i);
+
+ //We want to make the elements close together but not exactly the same MS.
+ delay += 1;
+ }
+
+ //Randomly sort the list
+ List<Long> keys = new ArrayList<Long>(data.keySet());
+ Collections.shuffle(keys);
+
+ //Put the messages onto the queue in random order, but with the appropriate
+ //delay and value
+ for ( Long key : keys )
+ {
+ queue.put(data.get(key), key);
+ }
+
+ long lastValue = -1;
+ for ( int i = 0; i < QTY; ++i )
+ {
+ Long value = consumer.take(6, TimeUnit.SECONDS);
+ Assert.assertNotNull(value);
+ Assert.assertEquals(value, new Long(lastValue + 1));
+ lastValue = value;
+ }
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(queue);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
private static class LongSerializer implements QueueSerializer<Long>
{
[3/3] git commit: Merge branch 'CURATOR-116' of
https://github.com/cammckenzie/curator into CURATOR-116
Posted by ca...@apache.org.
Merge branch 'CURATOR-116' of https://github.com/cammckenzie/curator into CURATOR-116
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d062906b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d062906b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d062906b
Branch: refs/heads/CURATOR-116
Commit: d062906bc28bfc66b16ec987e65ba6e558870b24
Parents: 2c0c523 fdc5600
Author: Cam McKenzie <ca...@apache.org>
Authored: Tue Jul 8 08:43:03 2014 +1000
Committer: Cam McKenzie <ca...@apache.org>
Committed: Tue Jul 8 08:43:03 2014 +1000
----------------------------------------------------------------------
.../recipes/queue/DistributedDelayQueue.java | 12 +++-
.../queue/TestDistributedDelayQueue.java | 67 +++++++++++++++++++-
2 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: CURATOR-116 - Made the getDelay() method private,
as it does not need to be protected.
Posted by ca...@apache.org.
CURATOR-116 - Made the getDelay() method private, as it does not need to
be protected.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fdc56009
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fdc56009
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fdc56009
Branch: refs/heads/CURATOR-116
Commit: fdc5600953c29e0cdcfe16396b27d1b447e13380
Parents: 1103f47
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Thu Jun 26 08:51:58 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Thu Jun 26 08:51:58 2014 +1000
----------------------------------------------------------------------
.../curator/framework/recipes/queue/DistributedDelayQueue.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fdc56009/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
index bd90e71..ff39f42 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.java
@@ -83,7 +83,7 @@ public class DistributedDelayQueue<T> implements Closeable, QueueBase<T>
return getDelay(itemNode, System.currentTimeMillis());
}
- protected long getDelay(String itemNode, long sortTime)
+ private long getDelay(String itemNode, long sortTime)
{
long epoch = getEpoch(itemNode);
return epoch - sortTime;