You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2015/08/25 06:08:41 UTC

[21/50] curator git commit: Curator-224: Fixed the requeuing problem with DistributedIdQueue.

Curator-224: Fixed the requeuing problem with DistributedIdQueue.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/83e1a855
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/83e1a855
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/83e1a855

Branch: refs/heads/CURATOR-167
Commit: 83e1a855a15e17a37ec02d441d6c75e7f08a2617
Parents: 20e92a5
Author: Zhihong Zhang <zh...@pixia.com>
Authored: Mon Jun 22 14:59:02 2015 -0400
Committer: Zhihong Zhang <zh...@pixia.com>
Committed: Mon Jun 22 14:59:02 2015 -0400

----------------------------------------------------------------------
 .../recipes/queue/DistributedQueue.java         |  2 +-
 .../recipes/queue/TestDistributedIdQueue.java   | 47 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/83e1a855/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
index 9dd2217..a183adf 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
@@ -756,7 +756,7 @@ public class DistributedQueue<T> implements QueueBase<T>
                 client.inTransaction()
                     .delete().forPath(itemPath)
                     .and()
-                    .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeItemPath(), bytes)
+                    .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(itemPath, bytes)
                     .and()
                     .commit();
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/83e1a855/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
index 30e552f..858086b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
@@ -124,4 +124,51 @@ public class TestDistributedIdQueue extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testRequeuingWithLock() throws Exception
+    {
+        DistributedIdQueue<TestQueueItem>  queue = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final CountDownLatch        consumingLatch = new CountDownLatch(1);
+
+            QueueConsumer<TestQueueItem> consumer = new QueueConsumer<TestQueueItem>()
+            {
+                @Override
+                public void consumeMessage(TestQueueItem message) throws Exception
+                {
+                    consumingLatch.countDown();
+                    // Throw an exception so requeuing occurs
+                    throw new Exception("Consumer failed");
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                }
+            };
+
+            queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH).lockPath("/locks").buildIdQueue();
+            queue.start();
+
+            queue.put(new TestQueueItem("test"), "id");
+
+            Assert.assertTrue(consumingLatch.await(10, TimeUnit.SECONDS));  // wait until consumer has it
+
+            // Sleep one more second
+
+            Thread.sleep(1000);
+
+            Assert.assertEquals(queue.remove("id"), 1);
+
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(queue);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }