You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/20 20:39:49 UTC

curator git commit: Parent container deletions could cause a hang as the watcher would never fire. If parent containers get deleted now, reset and recreate them

Repository: curator
Updated Branches:
  refs/heads/CURATOR-308 [created] 9261ee622


Parent container deletions could cause a hang as the watcher would never fire. If parent containers get deleted now, reset and recreate them


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9261ee62
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9261ee62
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9261ee62

Branch: refs/heads/CURATOR-308
Commit: 9261ee622e2e2dd7b16969561cb597712f078acd
Parents: 48e2c15
Author: randgalt <ra...@apache.org>
Authored: Fri May 20 15:39:36 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri May 20 15:39:36 2016 -0500

----------------------------------------------------------------------
 .../curator/framework/EnsureContainers.java     |  5 ++
 .../recipes/queue/SimpleDistributedQueue.java   | 14 ++-
 .../queue/TestSimpleDistributedQueue.java       | 89 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9261ee62/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java b/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java
index 697df62..b002b90 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/EnsureContainers.java
@@ -54,6 +54,11 @@ public class EnsureContainers
         }
     }
 
+    public void reset()
+    {
+        ensureNeeded.set(true);
+    }
+
     private synchronized void internalEnsure() throws Exception
     {
         if ( ensureNeeded.compareAndSet(true, false) )

http://git-wip-us.apache.org/repos/asf/curator/blob/9261ee62/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
index 35afb53..c80ad36 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
@@ -200,7 +200,17 @@ public class SimpleDistributedQueue
                     latch.countDown();
                 }
             };
-            byte[]      bytes = internalElement(true, watcher);
+            byte[]      bytes = new byte[0];
+            try
+            {
+                bytes = internalElement(true, watcher);
+            }
+            catch ( NoSuchElementException dummy )
+            {
+                log.debug("Parent containers appear to have lapsed - recreate and retry");
+                ensureContainers.reset();
+                continue;
+            }
             if ( bytes != null )
             {
                 return bytes;
@@ -234,7 +244,7 @@ public class SimpleDistributedQueue
         }
         catch ( KeeperException.NoNodeException dummy )
         {
-            return null;
+            throw new NoSuchElementException();
         }
         Collections.sort(nodes);
 

http://git-wip-us.apache.org/repos/asf/curator/blob/9261ee62/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java
index dab1674..ec4c3d1 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestSimpleDistributedQueue.java
@@ -19,6 +19,8 @@
 package org.apache.curator.framework.recipes.queue;
 
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -26,6 +28,8 @@ import org.apache.curator.retry.RetryOneTime;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.assertEquals;
@@ -33,6 +37,91 @@ import static org.testng.Assert.assertTrue;
 
 public class TestSimpleDistributedQueue extends BaseClassForTests
 {
+    private static abstract class QueueUser implements Runnable
+    {
+        private static final String QUEUE_PATH = "/queue";
+        private static final int ITEM_COUNT = 10;
+
+        protected final SimpleDistributedQueue queue;
+        private final int sleepMillis;
+
+        public QueueUser(CuratorFramework curator, int sleepMillis)
+        {
+            this.queue = new SimpleDistributedQueue(curator, QUEUE_PATH);
+            this.sleepMillis = sleepMillis;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                for ( int i = 0; i < ITEM_COUNT; i++ )
+                {
+                    processItem(i);
+                    Thread.sleep(sleepMillis);
+                }
+            }
+            catch ( Exception e )
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        protected abstract void processItem(int itemNumber) throws Exception;
+    }
+
+    @Test
+    public void testHangFromContainerLoss() throws Exception
+    {
+        // for CURATOR-308
+
+        server.close();
+        System.setProperty("znode.container.checkIntervalMs", "100");
+        server = new TestingServer();
+
+        Timing timing = new Timing().multiple(.1);
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            ExecutorService executor = Executors.newFixedThreadPool(2);
+            executor.execute(new QueueUser(client, timing.milliseconds())
+            {
+                @Override
+                protected void processItem(int itemNumber) throws Exception
+                {
+                    System.out.println("Offering item");
+                    queue.offer(new byte[]{(byte)itemNumber});
+                }
+            });
+
+            executor.execute(new QueueUser(client, timing.multiple(.5).milliseconds())
+            {
+                @Override
+                protected void processItem(int itemNumber) throws Exception
+                {
+                    System.out.println("Taking item " + itemNumber);
+                    byte[] item = queue.take();
+                    if ( item == null )
+                    {
+                        throw new IllegalStateException("Null result for item " + itemNumber);
+                    }
+                    System.out.println("Got item " + item[0]);
+                }
+            });
+
+            executor.shutdown();
+            Assert.assertTrue(executor.awaitTermination((QueueUser.ITEM_COUNT * 2) * timing.milliseconds(), TimeUnit.MILLISECONDS));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+            System.clearProperty("znode.container.checkIntervalMs");
+        }
+    }
+
     @Test
     public void testPollWithTimeout() throws Exception
     {