You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/06/10 17:50:47 UTC
[2/5] git commit: PersistentEphemeralNode recipe
PersistentEphemeralNode recipe
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/f074683b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/f074683b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/f074683b
Branch: refs/heads/CURATOR-25
Commit: f074683bc44b08dfa201e81977a9e736644bfae8
Parents: aec1cfd
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 10 08:42:25 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 10 08:42:25 2013 -0700
----------------------------------------------------------------------
.../recipes/nodes/PersistentEphemeralNode.java | 349 ++++++++++++++++++
.../nodes/TestPersistentEphemeralNode.java | 361 +++++++++++++++++++
2 files changed, 710 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/f074683b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
new file mode 100644
index 0000000..d86af92
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -0,0 +1,349 @@
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ *
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+public class PersistentEphemeralNode implements Closeable
+{
+ @VisibleForTesting
+ volatile CountDownLatch initialCreateLatch = new CountDownLatch(1);
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final EnsurePath ensurePath;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final String basePath;
+ private final Mode mode;
+ private final byte[] data;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean isSuspended = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( Objects.equal(nodePath.get(), event.getPath()) )
+ {
+ createNode();
+ }
+ }
+ };
+ private final ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ isSuspended.set((newState != ConnectionState.RECONNECTED) && (newState != ConnectionState.CONNECTED));
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ createNode();
+ }
+ }
+ };
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
+ createNode();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * The mode for node creation
+ */
+ public enum Mode
+ {
+ /**
+ * Same as {@link CreateMode#EPHEMERAL}
+ */
+ EPHEMERAL()
+ {
+ @Override
+ protected CreateMode getCreateMode(boolean pathIsSet)
+ {
+ return CreateMode.EPHEMERAL;
+ }
+
+ @Override
+ protected boolean isProtected()
+ {
+ return false;
+ }
+ },
+
+ /**
+ * Same as {@link CreateMode#EPHEMERAL_SEQUENTIAL}
+ */
+ EPHEMERAL_SEQUENTIAL()
+ {
+ @Override
+ protected CreateMode getCreateMode(boolean pathIsSet)
+ {
+ return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
+ }
+
+ @Override
+ protected boolean isProtected()
+ {
+ return false;
+ }
+ },
+
+ /**
+ * Same as {@link CreateMode#EPHEMERAL} with protection
+ */
+ PROTECTED_EPHEMERAL()
+ {
+ @Override
+ protected CreateMode getCreateMode(boolean pathIsSet)
+ {
+ return CreateMode.EPHEMERAL;
+ }
+
+ @Override
+ protected boolean isProtected()
+ {
+ return true;
+ }
+ },
+
+ /**
+ * Same as {@link CreateMode#EPHEMERAL_SEQUENTIAL} with protection
+ */
+ PROTECTED_EPHEMERAL_SEQUENTIAL()
+ {
+ @Override
+ protected CreateMode getCreateMode(boolean pathIsSet)
+ {
+ return pathIsSet ? CreateMode.EPHEMERAL : CreateMode.EPHEMERAL_SEQUENTIAL;
+ }
+
+ @Override
+ protected boolean isProtected()
+ {
+ return true;
+ }
+ }
+
+ ;
+
+ protected abstract CreateMode getCreateMode(boolean pathIsSet);
+
+ protected abstract boolean isProtected();
+ }
+
+ /**
+ * @param client client instance
+ * @param mode creation/protection mode
+ * @param basePath the base path for the node
+ * @param data data for the node
+ */
+ public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] data)
+ {
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ data = Preconditions.checkNotNull(data, "data cannot be null");
+
+ String parentDir = ZKPaths.getPathAndNode(basePath).getPath();
+ ensurePath = client.newNamespaceAwareEnsurePath(parentDir);
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ String path = null;
+ if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ if ( path != null )
+ {
+ nodePath.set(path);
+ watchNode();
+
+ CountDownLatch localLatch = initialCreateLatch;
+ initialCreateLatch = null;
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = mode.isProtected() ? client.create().withProtection() : client.create();
+ this.data = Arrays.copyOf(data, data.length);
+ }
+
+ /**
+ * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ client.getConnectionStateListenable().addListener(listener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link #start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+ return initialCreateLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void close()
+ {
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+ client.getConnectionStateListenable().removeListener(listener);
+
+ deleteNode();
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ private void deleteNode()
+ {
+ String localNodePath = nodePath.getAndSet(null);
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.delete().guaranteed().forPath(localNodePath);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ catch ( Exception e )
+ {
+ log.error("Deleting node: " + localNodePath, e);
+ }
+ }
+ }
+
+ private void createNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ try
+ {
+ String existingPath = nodePath.get();
+ String createPath = (existingPath != null) ? existingPath : basePath;
+ ensurePath.ensure(client.getZookeeperClient());
+ createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data);
+ }
+ catch ( Exception e )
+ {
+ log.error("Creating node. BasePath: " + basePath, e);
+ }
+ }
+
+ private void watchNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ String localNodePath = nodePath.get();
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+ }
+ catch ( Exception e )
+ {
+ log.error("Watching node: " + localNodePath, e);
+ }
+ }
+ }
+
+ private boolean isActive()
+ {
+ return (state.get() == State.STARTED) && !isSuspended.get();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/f074683b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
new file mode 100644
index 0000000..033ec1f
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -0,0 +1,361 @@
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class TestPersistentEphemeralNode extends BaseClassForTests
+{
+ private static final String DIR = "/test";
+ private static final String PATH = ZKPaths.makePath(DIR, "/foo");
+
+ private final Collection<CuratorFramework> curatorInstances = Lists.newArrayList();
+ private final Collection<PersistentEphemeralNode> createdNodes = Lists.newArrayList();
+
+ @AfterMethod
+ public void teardown() throws Exception
+ {
+ for ( PersistentEphemeralNode node : createdNodes )
+ {
+ node.close();
+ }
+
+ for ( CuratorFramework curator : curatorInstances )
+ {
+ curator.close();
+ }
+
+ super.teardown();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testNullCurator() throws Exception
+ {
+ new PersistentEphemeralNode(null, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testNullPath() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, null, new byte[0]);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testNullData() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, null);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testNullMode() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ new PersistentEphemeralNode(curator, null, PATH, new byte[0]);
+ }
+
+ @Test
+ public void testDeletesNodeWhenClosed() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ node.start();
+ String path = null;
+ try
+ {
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+ path = node.getActualPath();
+ assertNodeExists(curator, path);
+ }
+ finally
+ {
+ node.close(); // After closing the path is set to null...
+ }
+
+ assertNodeDoesNotExist(curator, path);
+ }
+
+ @Test
+ public void testClosingMultipleTimes() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ node.start();
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+
+ String path = node.getActualPath();
+ node.close();
+ assertNodeDoesNotExist(curator, path);
+
+ node.close();
+ assertNodeDoesNotExist(curator, path);
+ }
+
+ @Test
+ public void testDeletesNodeWhenSessionDisconnects() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ CuratorFramework observer = newCurator();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ node.start();
+ try
+ {
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+ assertNodeExists(observer, node.getActualPath());
+
+ // Register a watch that will fire when the node is deleted...
+ Trigger deletedTrigger = Trigger.deleted();
+ observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
+
+ killSession(curator);
+
+ // Make sure the node got deleted
+ assertTrue(deletedTrigger.firedWithin(10, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ node.close();
+ }
+ }
+
+ @Test
+ public void testRecreatesNodeWhenSessionReconnects() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ CuratorFramework observer = newCurator();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ node.start();
+ try
+ {
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+ assertNodeExists(observer, node.getActualPath());
+
+ Trigger deletedTrigger = Trigger.deleted();
+ observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
+
+ killSession(curator);
+
+ // Make sure the node got deleted...
+ assertTrue(deletedTrigger.firedWithin(10, TimeUnit.SECONDS));
+
+ // Check for it to be recreated...
+ Trigger createdTrigger = Trigger.created();
+ Stat stat = observer.checkExists().usingWatcher(createdTrigger).forPath(node.getActualPath());
+ assertTrue(stat != null || createdTrigger.firedWithin(10, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ node.close();
+ }
+ }
+
+ @Test
+ public void testRecreatesNodeWhenSessionReconnectsMultipleTimes() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ CuratorFramework observer = newCurator();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ node.start();
+ try
+ {
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+ String path = node.getActualPath();
+ assertNodeExists(observer, path);
+
+ // We should be able to disconnect multiple times and each time the node should be recreated.
+ for ( int i = 0; i < 5; i++ )
+ {
+ Trigger deletionTrigger = Trigger.deleted();
+ observer.checkExists().usingWatcher(deletionTrigger).forPath(path);
+
+ // Kill the session, thus cleaning up the node...
+ killSession(curator);
+
+ // Make sure the node ended up getting deleted...
+ assertTrue(deletionTrigger.firedWithin(10, TimeUnit.SECONDS));
+
+ // Now put a watch in the background looking to see if it gets created...
+ Trigger creationTrigger = Trigger.created();
+ Stat stat = observer.checkExists().usingWatcher(creationTrigger).forPath(path);
+ assertTrue(stat != null || creationTrigger.firedWithin(10, TimeUnit.SECONDS));
+ }
+ }
+ finally
+ {
+ node.close();
+ }
+ }
+
+ @Test
+ public void testRecreatesNodeWhenItGetsDeleted() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
+ node.start();
+ try
+ {
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+ String originalNode = node.getActualPath();
+ assertNodeExists(curator, originalNode);
+
+ // Delete the original node...
+ curator.delete().forPath(originalNode);
+
+ // Since we're using an ephemeral node, and the original session hasn't been interrupted the name of the new
+ // node that gets created is going to be exactly the same as the original.
+ Trigger createdWatchTrigger = Trigger.created();
+ Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
+ assertTrue(stat != null || createdWatchTrigger.firedWithin(10, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ node.close();
+ }
+ }
+
+ @Test
+ public void testNodesCreateUniquePaths() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+
+ PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
+ node1.start();
+ try
+ {
+ node1.waitForInitialCreate(5, TimeUnit.SECONDS);
+ String path1 = node1.getActualPath();
+
+ PersistentEphemeralNode node2 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
+ node2.start();
+ try
+ {
+ node2.waitForInitialCreate(5, TimeUnit.SECONDS);
+ String path2 = node2.getActualPath();
+
+ assertFalse(path1.equals(path2));
+ }
+ finally
+ {
+ node2.close();
+ }
+ }
+ finally
+ {
+ node1.close();
+ }
+ }
+
+ @Test
+ public void testData() throws Exception
+ {
+ CuratorFramework curator = newCurator();
+ byte[] data = "Hello World".getBytes();
+
+ PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
+ node.start();
+ try
+ {
+ node.waitForInitialCreate(5, TimeUnit.SECONDS);
+ assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
+ }
+ finally
+ {
+ node.close();
+ }
+ }
+
+ private void assertNodeExists(CuratorFramework curator, String path) throws Exception
+ {
+ assertNotNull(path);
+ assertTrue(curator.checkExists().forPath(path) != null);
+ }
+
+ private void assertNodeDoesNotExist(CuratorFramework curator, String path) throws Exception
+ {
+ assertTrue(curator.checkExists().forPath(path) == null);
+ }
+
+ private CuratorFramework newCurator() throws IOException
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+
+ curatorInstances.add(client);
+ return client;
+ }
+
+ public void killSession(CuratorFramework curator) throws Exception
+ {
+ KillSession.kill(curator.getZookeeperClient().getZooKeeper(), curator.getZookeeperClient().getCurrentConnectionString());
+ }
+
+ private static final class Trigger implements Watcher
+ {
+ private final Event.EventType type;
+ private final CountDownLatch latch;
+
+ public Trigger(Event.EventType type)
+ {
+ assertNotNull(type);
+
+ this.type = type;
+ this.latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( type == event.getType() )
+ {
+ latch.countDown();
+ }
+ }
+
+ public boolean firedWithin(long duration, TimeUnit unit)
+ {
+ try
+ {
+ return latch.await(duration, unit);
+ } catch ( InterruptedException e )
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private static Trigger created()
+ {
+ return new Trigger(Event.EventType.NodeCreated);
+ }
+
+ private static Trigger deleted()
+ {
+ return new Trigger(Event.EventType.NodeDeleted);
+ }
+ }
+}
\ No newline at end of file