You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/06/10 17:50:47 UTC

[2/5] git commit: PersistentEphemeralNode recipe

PersistentEphemeralNode recipe


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/f074683b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/f074683b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/f074683b

Branch: refs/heads/CURATOR-25
Commit: f074683bc44b08dfa201e81977a9e736644bfae8
Parents: aec1cfd
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 10 08:42:25 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 10 08:42:25 2013 -0700

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java  | 349 ++++++++++++++++++
 .../nodes/TestPersistentEphemeralNode.java      | 361 +++++++++++++++++++
 2 files changed, 710 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/f074683b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
new file mode 100644
index 0000000..d86af92
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -0,0 +1,349 @@
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ *     A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ *     ZooKeeper, even through connection and session interruptions.
+ * </p>
+ *
+ * <p>
+ *     Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+public class PersistentEphemeralNode implements Closeable
+{
+    @VisibleForTesting
+    volatile CountDownLatch         initialCreateLatch = new CountDownLatch(1);
+
+    private final Logger                    log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework          client;
+    private final EnsurePath                ensurePath;
+    private final CreateModable<ACLBackgroundPathAndBytesable<String>>  createMethod;
+    private final AtomicReference<String>   nodePath = new AtomicReference<String>(null);
+    private final String                    basePath;
+    private final Mode                      mode;
+    private final byte[]                    data;
+    private final AtomicReference<State>    state = new AtomicReference<State>(State.LATENT);
+    private final AtomicBoolean             isSuspended = new AtomicBoolean(false);
+    private final BackgroundCallback        backgroundCallback;
+    private final Watcher                   watcher = new Watcher()
+    {
+        @Override
+        public void process(WatchedEvent event)
+        {
+            if ( Objects.equal(nodePath.get(), event.getPath()) )
+            {
+                createNode();
+            }
+        }
+    };
+    private final ConnectionStateListener   listener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            isSuspended.set((newState != ConnectionState.RECONNECTED) && (newState != ConnectionState.CONNECTED));
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                createNode();
+            }
+        }
+    };
+    private final BackgroundCallback        checkExistsCallback = new BackgroundCallback()
+    {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+            {
+                createNode();
+            }
+        }
+    };
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    /**
+     * The mode for node creation
+     */
+    public enum Mode
+    {
+        /**
+         * Same as {@link CreateMode#EPHEMERAL}
+         */
+        EPHEMERAL()
+        {
+            @Override
+            protected CreateMode getCreateMode(boolean pathIsSet)
+            {
+                return CreateMode.EPHEMERAL;
+            }
+
+            @Override
+            protected boolean isProtected()
+            {
+                return false;
+            }
+        },
+
+        /**
+         * Same as {@link CreateMode#EPHEMERAL_SEQUENTIAL}
+         */
+        EPHEMERAL_SEQUENTIAL()
+        {
+            @Override
+            protected CreateMode getCreateMode(boolean pathIsSet)
+            {
+                return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
+            }
+
+            @Override
+            protected boolean isProtected()
+            {
+                return false;
+            }
+        },
+
+        /**
+         * Same as {@link CreateMode#EPHEMERAL} with protection
+         */
+        PROTECTED_EPHEMERAL()
+        {
+            @Override
+            protected CreateMode getCreateMode(boolean pathIsSet)
+            {
+                return CreateMode.EPHEMERAL;
+            }
+
+            @Override
+            protected boolean isProtected()
+            {
+                return true;
+            }
+        },
+
+        /**
+         * Same as {@link CreateMode#EPHEMERAL_SEQUENTIAL} with protection
+         */
+        PROTECTED_EPHEMERAL_SEQUENTIAL()
+        {
+            @Override
+            protected CreateMode getCreateMode(boolean pathIsSet)
+            {
+                return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
+            }
+
+            @Override
+            protected boolean isProtected()
+            {
+                return true;
+            }
+        }
+
+        ;
+
+        protected abstract CreateMode getCreateMode(boolean pathIsSet);
+
+        protected abstract boolean isProtected();
+    }
+
+    /**
+     * @param client client instance
+     * @param mode creation/protection mode
+     * @param basePath the base path for the node
+     * @param data data for the node
+     */
+    public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] data)
+    {
+        this.client = Preconditions.checkNotNull(client, "client cannot be null");
+        this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
+        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+        data = Preconditions.checkNotNull(data, "data cannot be null");
+
+        String parentDir = ZKPaths.getPathAndNode(basePath).getPath();
+        ensurePath = client.newNamespaceAwareEnsurePath(parentDir);
+
+        backgroundCallback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                String      path = null;
+                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+                {
+                    path = event.getPath();
+                }
+                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
+                    path = event.getName();
+                }
+                if ( path != null )
+                {
+                    nodePath.set(path);
+                    watchNode();
+
+                    CountDownLatch localLatch = initialCreateLatch;
+                    initialCreateLatch = null;
+                    if ( localLatch != null )
+                    {
+                        localLatch.countDown();
+                    }
+                }
+                else
+                {
+                    createNode();
+                }
+            }
+        };
+
+        createMethod = mode.isProtected() ? client.create().withProtection() : client.create();
+        this.data = Arrays.copyOf(data, data.length);
+    }
+
+    /**
+     * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
+     * in the background will be started
+     */
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+        client.getConnectionStateListenable().addListener(listener);
+        createNode();
+    }
+
+    /**
+     * Block until the either initial node creation initiated by {@link #start()} succeeds or
+     * the timeout elapses.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit time unit
+     * @return if the node was created before timeout
+     * @throws InterruptedException if the thread is interrupted
+     */
+    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+        return initialCreateLatch.await(timeout, unit);
+    }
+
+    @Override
+    public void close()
+    {
+        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            return;
+        }
+
+        client.getConnectionStateListenable().removeListener(listener);
+
+        deleteNode();
+    }
+
+    /**
+     * Returns the currently set path or null if the node does not exist
+     *
+     * @return node path or null
+     */
+    public String getActualPath()
+    {
+        return nodePath.get();
+    }
+
+    private void deleteNode()
+    {
+        String          localNodePath = nodePath.getAndSet(null);
+        if ( localNodePath != null )
+        {
+            try
+            {
+                client.delete().guaranteed().forPath(localNodePath);
+            }
+            catch ( KeeperException.NoNodeException ignore )
+            {
+                // ignore
+            }
+            catch ( Exception e )
+            {
+                log.error("Deleting node: " + localNodePath, e);
+            }
+        }
+    }
+
+    private void createNode()
+    {
+        if ( !isActive() )
+        {
+            return;
+        }
+
+        try
+        {
+            String      existingPath = nodePath.get();
+            String      createPath = (existingPath != null) ? existingPath : basePath;
+            ensurePath.ensure(client.getZookeeperClient());
+            createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data);
+        }
+        catch ( Exception e )
+        {
+            log.error("Creating node. BasePath: " + basePath, e);
+        }
+    }
+
+    private void watchNode()
+    {
+        if ( !isActive() )
+        {
+            return;
+        }
+
+        String          localNodePath = nodePath.get();
+        if ( localNodePath != null )
+        {
+            try
+            {
+                client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+            }
+            catch ( Exception e )
+            {
+                log.error("Watching node: " + localNodePath, e);
+            }
+        }
+    }
+
+    private boolean isActive()
+    {
+        return (state.get() == State.STARTED) && !isSuspended.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/f074683b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
new file mode 100644
index 0000000..033ec1f
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -0,0 +1,361 @@
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class TestPersistentEphemeralNode extends BaseClassForTests
+{
+    private static final String DIR = "/test";
+    private static final String PATH = ZKPaths.makePath(DIR, "/foo");
+
+    private final Collection<CuratorFramework> curatorInstances = Lists.newArrayList();
+    private final Collection<PersistentEphemeralNode> createdNodes = Lists.newArrayList();
+
+    @AfterMethod
+    public void teardown() throws Exception
+    {
+        for ( PersistentEphemeralNode node : createdNodes )
+        {
+            node.close();
+        }
+
+        for ( CuratorFramework curator : curatorInstances )
+        {
+            curator.close();
+        }
+
+        super.teardown();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullCurator() throws Exception
+    {
+        new PersistentEphemeralNode(null, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullPath() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, null, new byte[0]);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullData() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullMode() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        new PersistentEphemeralNode(curator, null, PATH, new byte[0]);
+    }
+
+    @Test
+    public void testDeletesNodeWhenClosed() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+        node.start();
+        String path = null;
+        try
+        {
+            node.waitForInitialCreate(5, TimeUnit.SECONDS);
+            path = node.getActualPath();
+            assertNodeExists(curator, path);
+        }
+        finally
+        {
+            node.close();  // After closing the path is set to null...
+        }
+
+        assertNodeDoesNotExist(curator, path);
+    }
+
+    @Test
+    public void testClosingMultipleTimes() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+        node.start();
+        node.waitForInitialCreate(5, TimeUnit.SECONDS);
+
+        String path = node.getActualPath();
+        node.close();
+        assertNodeDoesNotExist(curator, path);
+
+        node.close();
+        assertNodeDoesNotExist(curator, path);
+    }
+
+    @Test
+    public void testDeletesNodeWhenSessionDisconnects() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        CuratorFramework observer = newCurator();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(5, TimeUnit.SECONDS);
+            assertNodeExists(observer, node.getActualPath());
+
+            // Register a watch that will fire when the node is deleted...
+            Trigger deletedTrigger = Trigger.deleted();
+            observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
+
+            killSession(curator);
+
+            // Make sure the node got deleted
+            assertTrue(deletedTrigger.firedWithin(10, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+
+    @Test
+    public void testRecreatesNodeWhenSessionReconnects() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        CuratorFramework observer = newCurator();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(5, TimeUnit.SECONDS);
+            assertNodeExists(observer, node.getActualPath());
+
+            Trigger deletedTrigger = Trigger.deleted();
+            observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
+
+            killSession(curator);
+
+            // Make sure the node got deleted...
+            assertTrue(deletedTrigger.firedWithin(10, TimeUnit.SECONDS));
+
+            // Check for it to be recreated...
+            Trigger createdTrigger = Trigger.created();
+            Stat stat = observer.checkExists().usingWatcher(createdTrigger).forPath(node.getActualPath());
+            assertTrue(stat != null || createdTrigger.firedWithin(10, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+
+    @Test
+    public void testRecreatesNodeWhenSessionReconnectsMultipleTimes() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        CuratorFramework observer = newCurator();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(5, TimeUnit.SECONDS);
+            String path = node.getActualPath();
+            assertNodeExists(observer, path);
+
+            // We should be able to disconnect multiple times and each time the node should be recreated.
+            for ( int i = 0; i < 5; i++ )
+            {
+                Trigger deletionTrigger = Trigger.deleted();
+                observer.checkExists().usingWatcher(deletionTrigger).forPath(path);
+
+                // Kill the session, thus cleaning up the node...
+                killSession(curator);
+
+                // Make sure the node ended up getting deleted...
+                assertTrue(deletionTrigger.firedWithin(10, TimeUnit.SECONDS));
+
+                // Now put a watch in the background looking to see if it gets created...
+                Trigger creationTrigger = Trigger.created();
+                Stat stat = observer.checkExists().usingWatcher(creationTrigger).forPath(path);
+                assertTrue(stat != null || creationTrigger.firedWithin(10, TimeUnit.SECONDS));
+            }
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+
+    @Test
+    public void testRecreatesNodeWhenItGetsDeleted() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(5, TimeUnit.SECONDS);
+            String originalNode = node.getActualPath();
+            assertNodeExists(curator, originalNode);
+
+            // Delete the original node...
+            curator.delete().forPath(originalNode);
+
+            // Since we're using an ephemeral node, and the original session hasn't been interrupted the name of the new
+            // node that gets created is going to be exactly the same as the original.
+            Trigger createdWatchTrigger = Trigger.created();
+            Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
+            assertTrue(stat != null || createdWatchTrigger.firedWithin(10, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+
+    @Test
+    public void testNodesCreateUniquePaths() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+
+        PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
+        node1.start();
+        try
+        {
+            node1.waitForInitialCreate(5, TimeUnit.SECONDS);
+            String path1 = node1.getActualPath();
+
+            PersistentEphemeralNode node2 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
+            node2.start();
+            try
+            {
+                node2.waitForInitialCreate(5, TimeUnit.SECONDS);
+                String path2 = node2.getActualPath();
+
+                assertFalse(path1.equals(path2));
+            }
+            finally
+            {
+                node2.close();
+            }
+        }
+        finally
+        {
+            node1.close();
+        }
+    }
+
+    @Test
+    public void testData() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        byte[] data = "Hello World".getBytes();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(5, TimeUnit.SECONDS);
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+
+    private void assertNodeExists(CuratorFramework curator, String path) throws Exception
+    {
+        assertNotNull(path);
+        assertTrue(curator.checkExists().forPath(path) != null);
+    }
+
+    private void assertNodeDoesNotExist(CuratorFramework curator, String path) throws Exception
+    {
+        assertTrue(curator.checkExists().forPath(path) == null);
+    }
+
+    private CuratorFramework newCurator() throws IOException
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+
+        curatorInstances.add(client);
+        return client;
+    }
+
+    public void killSession(CuratorFramework curator) throws Exception
+    {
+        KillSession.kill(curator.getZookeeperClient().getZooKeeper(), curator.getZookeeperClient().getCurrentConnectionString());
+    }
+
+    private static final class Trigger implements Watcher
+    {
+        private final Event.EventType type;
+        private final CountDownLatch latch;
+
+        public Trigger(Event.EventType type)
+        {
+            assertNotNull(type);
+
+            this.type = type;
+            this.latch = new CountDownLatch(1);
+        }
+
+        @Override
+        public void process(WatchedEvent event)
+        {
+            if ( type == event.getType() )
+            {
+                latch.countDown();
+            }
+        }
+
+        public boolean firedWithin(long duration, TimeUnit unit)
+        {
+            try
+            {
+                return latch.await(duration, unit);
+            } catch ( InterruptedException e )
+            {
+                throw Throwables.propagate(e);
+            }
+        }
+
+        private static Trigger created()
+        {
+            return new Trigger(Event.EventType.NodeCreated);
+        }
+
+        private static Trigger deleted()
+        {
+            return new Trigger(Event.EventType.NodeDeleted);
+        }
+    }
+}
\ No newline at end of file