You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/19 03:05:51 UTC

[1/5] curator git commit: Deprecated PersistentEphemeralNode in favor of PersistentNode which is the same code but now accepts any createmode

Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 d26c38dba -> c6a22ba50


Deprecated PersistentEphemeralNode in favor of PersistentNode which is the same code but now accepts any createmode


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fefbba1c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fefbba1c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fefbba1c

Branch: refs/heads/CURATOR-3.0
Commit: fefbba1cc3bd5641983657440b40e25425165a6a
Parents: 45332f3
Author: randgalt <ra...@apache.org>
Authored: Tue Jan 12 11:45:38 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jan 12 11:45:38 2016 -0500

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java  | 310 +--------------
 .../framework/recipes/nodes/PersistentNode.java | 382 +++++++++++++++++++
 .../src/site/confluence/group-member.confluence |   2 +-
 .../persistent-ephemeral-node.confluence        |  20 +-
 .../nodes/TestPersistentEphemeralNode.java      |   6 +-
 .../TestPersistentEphemeralNodeListener.java    |   1 +
 .../recipes/nodes/TestPersistentNode.java       |  62 +++
 7 files changed, 468 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 684e0d9..5576dc2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -19,30 +19,8 @@
 
 package org.apache.curator.framework.recipes.nodes;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CreateModable;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.PathUtils;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
@@ -52,85 +30,18 @@ import java.util.concurrent.atomic.AtomicReference;
  * <p>
  * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
  * </p>
+ *
+ * @deprecated This has been replaced with the more general {@link PersistentNode}
  */
-public class PersistentEphemeralNode implements Closeable
+@Deprecated
+public class PersistentEphemeralNode extends PersistentNode
 {
-    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
-    private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
-    private final String basePath;
-    private final Mode mode;
-    private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final AtomicBoolean authFailure = new AtomicBoolean(false);
-    private final BackgroundCallback backgroundCallback;
-    private final CuratorWatcher watcher = new CuratorWatcher()
-    {
-        @Override
-        public void process(WatchedEvent event) throws Exception
-        {
-            if ( event.getType() == EventType.NodeDeleted )
-            {
-                createNode();
-            }
-            else if ( event.getType() == EventType.NodeDataChanged )
-            {
-                watchNode();
-            }
-        }
-    };
-    private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
-    {
-        @Override
-        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-        {
-            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
-            {
-                createNode();
-            }
-        }
-    };
-    private final BackgroundCallback setDataCallback = new BackgroundCallback()
-    {
-
-        @Override
-        public void processResult(CuratorFramework client, CuratorEvent event)
-            throws Exception
-        {
-            //If the result is ok then initialisation is complete (if we're still initialising)
-            //Don't retry on other errors as the only recoverable cases will be connection loss
-            //and the node not existing, both of which are already handled by other watches.
-            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-            {
-                //Update is ok, mark initialisation as complete if required.
-                initialisationComplete();
-            }
-        }
-    };
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( newState == ConnectionState.RECONNECTED )
-            {
-                createNode();
-            }
-        }
-    };
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
     /**
      * The mode for node creation
+     *
+     * @deprecated This has been replaced with the more general {@link PersistentNode}
      */
+    @Deprecated
     public enum Mode
     {
         /**
@@ -216,212 +127,9 @@ public class PersistentEphemeralNode implements Closeable
      * @param basePath the base path for the node
      * @param initData     data for the node
      */
+    @SuppressWarnings("deprecation")
     public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] initData)
     {
-        this.client = Preconditions.checkNotNull(client, "client cannot be null");
-        this.basePath = PathUtils.validatePath(basePath);
-        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
-        final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
-
-        backgroundCallback = new BackgroundCallback()
-        {
-            @Override
-            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
-            {
-                String path = null;
-                boolean nodeExists = false;
-                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
-                {
-                    path = event.getPath();
-                    nodeExists = true;
-                }
-                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
-                {
-                    path = event.getName();
-                }
-                else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
-                {
-                    log.warn("Client does not have authorisation to write ephemeral node at path {}", event.getPath());
-                    authFailure.set(true);
-                    return;
-                }
-                if ( path != null )
-                {
-                    authFailure.set(false);
-                    nodePath.set(path);
-                    watchNode();
-
-                    if ( nodeExists )
-                    {
-                        client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
-                    }
-                    else
-                    {
-                        initialisationComplete();
-                    }
-                }
-                else
-                {
-                    createNode();
-                }
-            }
-        };
-
-        createMethod = mode.isProtected() ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
-        this.data.set(Arrays.copyOf(data, data.length));
-    }
-
-    private void initialisationComplete()
-    {
-        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
-        if ( localLatch != null )
-        {
-            localLatch.countDown();
-        }
-    }
-
-    /**
-     * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
-     * in the background will be started
-     */
-    public void start()
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
-
-        client.getConnectionStateListenable().addListener(connectionStateListener);
-        createNode();
-    }
-
-    /**
-     * Block until the either initial node creation initiated by {@link #start()} succeeds or
-     * the timeout elapses.
-     *
-     * @param timeout the maximum time to wait
-     * @param unit    time unit
-     * @return if the node was created before timeout
-     * @throws InterruptedException if the thread is interrupted
-     */
-    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
-    {
-        Preconditions.checkState(state.get() == State.STARTED, "Not started");
-
-        CountDownLatch localLatch = initialCreateLatch.get();
-        return (localLatch == null) || localLatch.await(timeout, unit);
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            return;
-        }
-
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
-
-        try
-        {
-            deleteNode();
-        }
-        catch ( Exception e )
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /**
-     * Returns the currently set path or null if the node does not exist
-     *
-     * @return node path or null
-     */
-    public String getActualPath()
-    {
-        return nodePath.get();
-    }
-
-    /**
-     * Set data that ephemeral node should set in ZK also writes the data to the node
-     *
-     * @param data new data value
-     * @throws Exception errors
-     */
-    public void setData(byte[] data) throws Exception
-    {
-        data = Preconditions.checkNotNull(data, "data cannot be null");
-        this.data.set(Arrays.copyOf(data, data.length));
-        if ( isActive() )
-        {
-            client.setData().inBackground().forPath(getActualPath(), getData());
-        }
-    }
-
-    /**
-     * Return the current value of our data
-     *
-     * @return our data
-     */
-    public byte[] getData()
-    {
-        return this.data.get();
-    }
-
-    private void deleteNode() throws Exception
-    {
-        String localNodePath = nodePath.getAndSet(null);
-        if ( localNodePath != null )
-        {
-            try
-            {
-                client.delete().guaranteed().forPath(localNodePath);
-            }
-            catch ( KeeperException.NoNodeException ignore )
-            {
-                // ignore
-            }
-        }
-    }
-
-    private void createNode()
-    {
-        if ( !isActive() )
-        {
-            return;
-        }
-
-        try
-        {
-            String existingPath = nodePath.get();
-            String createPath = (existingPath != null && !mode.isProtected()) ? existingPath : basePath;
-            createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
-        }
-        catch ( Exception e )
-        {
-            throw new RuntimeException("Creating node. BasePath: " + basePath, e);  // should never happen unless there's a programming error - so throw RuntimeException
-        }
-    }
-
-    private void watchNode() throws Exception
-    {
-        if ( !isActive() )
-        {
-            return;
-        }
-
-        String localNodePath = nodePath.get();
-        if ( localNodePath != null )
-        {
-            client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
-        }
-    }
-
-    private boolean isActive()
-    {
-        return (state.get() == State.STARTED);
-    }
-
-    @VisibleForTesting
-    boolean isAuthFailure()
-    {
-        return authFailure.get();
+        super(client, mode.getCreateMode(false), mode.isProtected(), basePath, initData);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
new file mode 100644
index 0000000..c66eb30
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+public class PersistentNode implements Closeable
+{
+    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+    private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+    private final String basePath;
+    private final CreateMode mode;
+    private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicBoolean authFailure = new AtomicBoolean(false);
+    private final BackgroundCallback backgroundCallback;
+    private final boolean useProtection;
+    private final CuratorWatcher watcher = new CuratorWatcher()
+    {
+        @Override
+        public void process(WatchedEvent event) throws Exception
+        {
+            if ( event.getType() == EventType.NodeDeleted )
+            {
+                createNode();
+            }
+            else if ( event.getType() == EventType.NodeDataChanged )
+            {
+                watchNode();
+            }
+        }
+    };
+    private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+    {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+            {
+                createNode();
+            }
+            else
+            {
+                boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
+                if ( isEphemeral != mode.isEphemeral() )
+                {
+                    log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
+                }
+            }
+        }
+    };
+    private final BackgroundCallback setDataCallback = new BackgroundCallback()
+    {
+
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event)
+            throws Exception
+        {
+            //If the result is ok then initialisation is complete (if we're still initialising)
+            //Don't retry on other errors as the only recoverable cases will be connection loss
+            //and the node not existing, both of which are already handled by other watches.
+            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+            {
+                //Update is ok, mark initialisation as complete if required.
+                initialisationComplete();
+            }
+        }
+    };
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                createNode();
+            }
+        }
+    };
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    /**
+     * @param client        client instance
+     * @param mode          creation mode
+     * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+     * @param basePath the base path for the node
+     * @param initData data for the node
+     */
+    public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+    {
+        this.useProtection = useProtection;
+        this.client = Preconditions.checkNotNull(client, "client cannot be null");
+        this.basePath = PathUtils.validatePath(basePath);
+        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+        final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+
+        backgroundCallback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+            {
+                String path = null;
+                boolean nodeExists = false;
+                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+                {
+                    path = event.getPath();
+                    nodeExists = true;
+                }
+                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
+                    path = event.getName();
+                }
+                else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+                {
+                    log.warn("Client does not have authorisation to write ephemeral node at path {}", event.getPath());
+                    authFailure.set(true);
+                    return;
+                }
+                if ( path != null )
+                {
+                    authFailure.set(false);
+                    nodePath.set(path);
+                    watchNode();
+
+                    if ( nodeExists )
+                    {
+                        client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+                    }
+                    else
+                    {
+                        initialisationComplete();
+                    }
+                }
+                else
+                {
+                    createNode();
+                }
+            }
+        };
+
+        createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+        this.data.set(Arrays.copyOf(data, data.length));
+    }
+
+    private void initialisationComplete()
+    {
+        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+        if ( localLatch != null )
+        {
+            localLatch.countDown();
+        }
+    }
+
+    /**
+     * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
+     * in the background will be started
+     */
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        createNode();
+    }
+
+    /**
+     * Block until the either initial node creation initiated by {@link #start()} succeeds or
+     * the timeout elapses.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit    time unit
+     * @return if the node was created before timeout
+     * @throws InterruptedException if the thread is interrupted
+     */
+    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+        CountDownLatch localLatch = initialCreateLatch.get();
+        return (localLatch == null) || localLatch.await(timeout, unit);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            return;
+        }
+
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+        try
+        {
+            deleteNode();
+        }
+        catch ( Exception e )
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Returns the currently set path or null if the node does not exist
+     *
+     * @return node path or null
+     */
+    public String getActualPath()
+    {
+        return nodePath.get();
+    }
+
+    /**
+     * Set data that ephemeral node should set in ZK also writes the data to the node
+     *
+     * @param data new data value
+     * @throws Exception errors
+     */
+    public void setData(byte[] data) throws Exception
+    {
+        data = Preconditions.checkNotNull(data, "data cannot be null");
+        this.data.set(Arrays.copyOf(data, data.length));
+        if ( isActive() )
+        {
+            client.setData().inBackground().forPath(getActualPath(), getData());
+        }
+    }
+
+    /**
+     * Return the current value of our data
+     *
+     * @return our data
+     */
+    public byte[] getData()
+    {
+        return this.data.get();
+    }
+
+    private void deleteNode() throws Exception
+    {
+        String localNodePath = nodePath.getAndSet(null);
+        if ( localNodePath != null )
+        {
+            try
+            {
+                client.delete().guaranteed().forPath(localNodePath);
+            }
+            catch ( KeeperException.NoNodeException ignore )
+            {
+                // ignore
+            }
+        }
+    }
+
+    private void createNode()
+    {
+        if ( !isActive() )
+        {
+            return;
+        }
+
+        try
+        {
+            String existingPath = nodePath.get();
+            String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+            createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException("Creating node. BasePath: " + basePath, e);  // should never happen unless there's a programming error - so throw RuntimeException
+        }
+    }
+
+    private CreateMode getCreateMode(boolean pathIsSet)
+    {
+        if ( pathIsSet )
+        {
+            switch ( mode )
+            {
+            default:
+            {
+                break;
+            }
+
+            case EPHEMERAL_SEQUENTIAL:
+            {
+                return CreateMode.EPHEMERAL;    // protection case - node already set
+            }
+
+            case PERSISTENT_SEQUENTIAL:
+            {
+                return CreateMode.PERSISTENT;    // protection case - node already set
+            }
+            }
+        }
+        return mode;
+    }
+
+    private void watchNode() throws Exception
+    {
+        if ( !isActive() )
+        {
+            return;
+        }
+
+        String localNodePath = nodePath.get();
+        if ( localNodePath != null )
+        {
+            client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+        }
+    }
+
+    private boolean isActive()
+    {
+        return (state.get() == State.STARTED);
+    }
+
+    @VisibleForTesting
+    boolean isAuthFailure()
+    {
+        return authFailure.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/site/confluence/group-member.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/group-member.confluence b/curator-recipes/src/site/confluence/group-member.confluence
index a370675..5c24c75 100644
--- a/curator-recipes/src/site/confluence/group-member.confluence
+++ b/curator-recipes/src/site/confluence/group-member.confluence
@@ -5,7 +5,7 @@ Group membership management. Adds this instance into a group and keeps a cache o
 
 h2. Participating Classes
 * GroupMember
-* PersistentEphemeralNode
+* PersistentNode
 * PathChildrenCache
 
 h2. Usage

http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence b/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
index 0023f57..aeb9e10 100644
--- a/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
+++ b/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
@@ -1,32 +1,34 @@
 h1. Persistent Ephemeral Node
 
 h2. Description
-A persistent ephemeral node is an ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.
+A persistent node is a node that attempts to stay present in ZooKeeper, even through connection and session interruptions.
 
 h2. Participating Classes
-* PersistentEphemeralNode
+* PersistentNode
 
 h2. Usage
-h3. Creating a PersistentEphemeralNode
+h3. Creating a PersistentNode
 {code}
-public PersistentEphemeralNode(CuratorFramework client,
-                               PersistentEphemeralNode.Mode mode,
+public PersistentNode(CuratorFramework client,
+                               CreateMode mode,
+                               boolean useProtection,
                                String basePath,
                                byte[] data)
 Parameters:
 client - client instance
-mode - creation/protection mode
+mode - creation mode
+useProtection - if true, call CreateBuilder.withProtection()
 basePath - the base path for the node
 data - data for the node
 {code}
 
 h3. General Usage
-PersistentEphemeralNodes must be started:
+PersistentNodes must be started:
 {code}
 node.start();
 {code}
 
-When you are through with the PersistentEphemeralNode instance, you should call close:
+When you are through with the PersistentNode instance, you should call close:
 {code}
 node.close();
 {code}
@@ -34,4 +36,4 @@ node.close();
 NOTE: this will delete the node
 
 h2. Error Handling
-PersistentEphemeralNode instances internally handle all error states recreating the node as necessary.
+PersistentNode instances internally handle all error states recreating the node as necessary.

http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 84eaa52..f451feb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -30,21 +29,19 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -55,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.*;
 
+@SuppressWarnings("deprecation")
 public class TestPersistentEphemeralNode extends BaseClassForTests
 {
     private static final String DIR = "/test";

http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
index ceff4c5..6771eec 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+@SuppressWarnings("deprecation")
 public class TestPersistentEphemeralNodeListener extends BaseClassForTests
 {
     @Test

http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
new file mode 100644
index 0000000..c006dd7
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.nodes;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.TimeUnit;
+
+public class TestPersistentNode extends BaseClassForTests
+{
+    @Test
+    public void testBasic() throws Exception
+    {
+        final byte[] TEST_DATA = "hey".getBytes();
+
+        Timing timing = new Timing();
+        PersistentNode pen = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
+            pen.start();
+            Assert.assertTrue(pen.waitForInitialCreate(timing.milliseconds(), TimeUnit.MILLISECONDS));
+            client.close(); // cause session to end - force checks that node is persistent
+
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+
+            byte[] bytes = client.getData().forPath("/test");
+            Assert.assertEquals(bytes, TEST_DATA);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(pen);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+}


[2/5] curator git commit: fixed doc

Posted by ra...@apache.org.
fixed doc


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e657cf66
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e657cf66
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e657cf66

Branch: refs/heads/CURATOR-3.0
Commit: e657cf66bc54dec762f0355a56cf3b3504a960d9
Parents: fefbba1
Author: randgalt <ra...@apache.org>
Authored: Tue Jan 12 12:13:58 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jan 12 12:13:58 2016 -0500

----------------------------------------------------------------------
 .../org/apache/curator/framework/recipes/nodes/PersistentNode.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e657cf66/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index c66eb30..fbeaeff 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
- * A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ * A persistent node is a node that attempts to stay present in
  * ZooKeeper, even through connection and session interruptions.
  * </p>
  * <p>


[5/5] curator git commit: Merge branch 'master' into CURATOR-3.0

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-3.0

Conflicts:
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c6a22ba5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c6a22ba5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c6a22ba5

Branch: refs/heads/CURATOR-3.0
Commit: c6a22ba508f9227fe1c657f93e3cc77d8bc17e3e
Parents: d26c38d 649e0ba
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 21:05:41 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 21:05:41 2016 -0500

----------------------------------------------------------------------
 .../framework/imps/NamespaceWatcherMap.java     |  14 +-
 .../framework/imps/WatcherRemovalFacade.java    |   2 +-
 .../framework/imps/WatcherRemovalManager.java   |   3 +-
 .../recipes/nodes/PersistentEphemeralNode.java  | 332 +-------------
 .../framework/recipes/nodes/PersistentNode.java | 436 +++++++++++++++++++
 .../src/site/confluence/group-member.confluence |   2 +-
 .../persistent-ephemeral-node.confluence        |  20 +-
 .../nodes/TestPersistentEphemeralNode.java      |  22 +-
 .../TestPersistentEphemeralNodeListener.java    |   1 +
 .../recipes/nodes/TestPersistentNode.java       |  63 +++
 10 files changed, 555 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index 00618e6,e5aecb2..c864f44
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@@ -19,6 -19,6 +19,7 @@@
  package org.apache.curator.framework.imps;
  
  import com.google.common.annotations.VisibleForTesting;
++import com.google.common.cache.Cache;
  import com.google.common.cache.CacheBuilder;
  import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.zookeeper.Watcher;
@@@ -28,10 -28,10 +29,10 @@@ import java.util.concurrent.ConcurrentM
  
  class NamespaceWatcherMap implements Closeable
  {
--    private final ConcurrentMap<Object, NamespaceWatcher> map = CacheBuilder.newBuilder()
++    private final Cache<Object, NamespaceWatcher> cache = CacheBuilder.newBuilder()
          .weakValues()
--        .<Object, NamespaceWatcher>build()
--        .asMap();
++        .<Object, NamespaceWatcher>build();
++    private final ConcurrentMap<Object, NamespaceWatcher> map = cache.asMap();
      private final CuratorFrameworkImpl client;
  
      NamespaceWatcherMap(CuratorFrameworkImpl client)
@@@ -85,6 -74,6 +86,7 @@@
      @VisibleForTesting
      boolean isEmpty()
      {
++        cache.cleanUp();
          return map.isEmpty();
      }
  
@@@ -103,4 -92,4 +105,10 @@@
          NamespaceWatcher        existingNamespaceWatcher = map.putIfAbsent(watcher, newNamespaceWatcher);
          return (existingNamespaceWatcher != null) ? existingNamespaceWatcher : newNamespaceWatcher;
      }
++
++    @Override
++    public String toString()
++    {
++        return map.toString();
++    }
  }

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 91530b4,0000000..30a992e
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@@ -1,198 -1,0 +1,198 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.curator.framework.imps;
 +
 +import org.apache.curator.CuratorZookeeperClient;
 +import org.apache.curator.RetryLoop;
 +import org.apache.curator.framework.CuratorFramework;
 +import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 +import org.apache.curator.framework.api.CuratorEvent;
 +import org.apache.curator.framework.api.CuratorListener;
 +import org.apache.curator.framework.api.UnhandledErrorListener;
 +import org.apache.curator.framework.listen.Listenable;
 +import org.apache.curator.framework.state.ConnectionStateListener;
 +import org.apache.curator.utils.DebugUtils;
 +import org.apache.curator.utils.EnsurePath;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 +
 +class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
 +{
 +    private final CuratorFrameworkImpl client;
 +    private final WatcherRemovalManager removalManager;
 +
 +    WatcherRemovalFacade(CuratorFrameworkImpl client)
 +    {
 +        super(client);
 +        this.client = client;
 +        removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap());
 +    }
 +
 +    @Override
 +    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
 +    {
 +        return client.newWatcherRemoveCuratorFramework();
 +    }
 +
 +    WatcherRemovalManager getRemovalManager()
 +    {
 +        return removalManager;
 +    }
 +
 +    @Override
 +    public QuorumVerifier getCurrentConfig()
 +    {
 +        return client.getCurrentConfig();
 +    }
 +
 +    @Override
 +    public void removeWatchers()
 +    {
 +        removalManager.removeWatchers();
 +
 +        if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) )
 +        {
 +            if ( !getNamespaceWatcherMap().isEmpty() )
 +            {
-                 throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap());
++                throw new RuntimeException("NamespaceWatcherMap is not empty: " + getNamespaceWatcherMap());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    WatcherRemovalManager getWatcherRemovalManager()
 +    {
 +        return removalManager;
 +    }
 +
 +    @Override
 +    public CuratorFramework nonNamespaceView()
 +    {
 +        return client.nonNamespaceView();
 +    }
 +
 +    @Override
 +    public CuratorFramework usingNamespace(String newNamespace)
 +    {
 +        return client.usingNamespace(newNamespace);
 +    }
 +
 +    @Override
 +    public String getNamespace()
 +    {
 +        return client.getNamespace();
 +    }
 +
 +    @Override
 +    public void start()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void close()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public Listenable<ConnectionStateListener> getConnectionStateListenable()
 +    {
 +        return client.getConnectionStateListenable();
 +    }
 +
 +    @Override
 +    public Listenable<CuratorListener> getCuratorListenable()
 +    {
 +        return client.getCuratorListenable();
 +    }
 +
 +    @Override
 +    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
 +    {
 +        return client.getUnhandledErrorListenable();
 +    }
 +
 +    @Override
 +    public void sync(String path, Object context)
 +    {
 +        client.sync(path, context);
 +    }
 +
 +    @Override
 +    public CuratorZookeeperClient getZookeeperClient()
 +    {
 +        return client.getZookeeperClient();
 +    }
 +
 +    @Override
 +    RetryLoop newRetryLoop()
 +    {
 +        return client.newRetryLoop();
 +    }
 +
 +    @Override
 +    ZooKeeper getZooKeeper() throws Exception
 +    {
 +        return client.getZooKeeper();
 +    }
 +
 +    @Override
 +    <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
 +    {
 +        client.processBackgroundOperation(operationAndData, event);
 +    }
 +
 +    @Override
 +    void logError(String reason, Throwable e)
 +    {
 +        client.logError(reason, e);
 +    }
 +
 +    @Override
 +    String unfixForNamespace(String path)
 +    {
 +        return client.unfixForNamespace(path);
 +    }
 +
 +    @Override
 +    String fixForNamespace(String path)
 +    {
 +        return client.fixForNamespace(path);
 +    }
 +    
 +    @Override
 +    String fixForNamespace(String path, boolean isSequential)
 +    {
 +    	return client.fixForNamespace(path, isSequential);
 +    }
 +
 +    @Override
 +    public EnsurePath newNamespaceAwareEnsurePath(String path)
 +    {
 +        return client.newNamespaceAwareEnsurePath(path);
 +    }
 +
 +    @Override
 +    FailedDeleteManager getFailedDeleteManager()
 +    {
 +        return client.getFailedDeleteManager();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index 064964d,0000000..1e6fe94
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@@ -1,140 -1,0 +1,141 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.curator.framework.imps;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Sets;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +public class WatcherRemovalManager
 +{
 +    private final Logger log = LoggerFactory.getLogger(getClass());
 +    private final CuratorFrameworkImpl client;
 +    private final NamespaceWatcherMap namespaceWatcherMap;
 +    private final Set<WrappedWatcher> entries = Sets.newHashSet();  // guarded by sync
 +
 +    WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap)
 +    {
 +        this.client = client;
 +        this.namespaceWatcherMap = namespaceWatcherMap;
 +    }
 +
 +    synchronized Watcher add(String path, Watcher watcher)
 +    {
 +        path = Preconditions.checkNotNull(path, "path cannot be null");
 +        watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null");
 +
 +        WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path);
 +        entries.add(wrappedWatcher);
 +        return wrappedWatcher;
 +    }
 +
 +    @VisibleForTesting
 +    synchronized Set<? extends Watcher> getEntries()
 +    {
 +        return Sets.newHashSet(entries);
 +    }
 +
 +    void removeWatchers()
 +    {
 +        HashSet<WrappedWatcher> localEntries;
 +        synchronized(this)
 +        {
 +            localEntries = Sets.newHashSet(entries);
 +        }
 +        for ( WrappedWatcher entry : localEntries )
 +        {
 +            try
 +            {
 +                log.debug("Removing watcher for path: " + entry.path);
-                 namespaceWatcherMap.removeWatcher(entry.watcher);
 +                RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
++                namespaceWatcherMap.removeWatcher(entry.watcher);
 +                builder.internalRemoval(entry, entry.path);
 +            }
 +            catch ( Exception e )
 +            {
 +                log.error("Could not remove watcher for path: " + entry.path);
 +            }
 +        }
 +    }
 +
 +    private synchronized void internalRemove(WrappedWatcher entry)
 +    {
++        namespaceWatcherMap.removeWatcher(entry.watcher);
 +        entries.remove(entry);
 +    }
 +
 +    private class WrappedWatcher implements Watcher
 +    {
 +        private final Watcher watcher;
 +        private final String path;
 +
 +        WrappedWatcher(Watcher watcher, String path)
 +        {
 +            this.watcher = watcher;
 +            this.path = path;
 +        }
 +
 +        @Override
 +        public void process(WatchedEvent event)
 +        {
 +            if ( event.getType() != Event.EventType.None )
 +            {
 +                internalRemove(this);
 +            }
 +            watcher.process(event);
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if ( this == o )
 +            {
 +                return true;
 +            }
 +            if ( o == null || getClass() != o.getClass() )
 +            {
 +                return false;
 +            }
 +
 +            WrappedWatcher entry = (WrappedWatcher)o;
 +
 +            //noinspection SimplifiableIfStatement
 +            if ( !watcher.equals(entry.watcher) )
 +            {
 +                return false;
 +            }
 +            return path.equals(entry.path);
 +
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            int result = watcher.hashCode();
 +            result = 31 * result + path.hashCode();
 +            return result;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 0000000,0d7ab9d..93c88f7
mode 000000,100644..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,0 -1,385 +1,436 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.curator.framework.recipes.nodes;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import org.apache.curator.framework.CuratorFramework;
++import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+ import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CreateBuilder;
+ import org.apache.curator.framework.api.CreateModable;
+ import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+ import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import java.io.Closeable;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ /**
+  * <p>
+  * A persistent node is a node that attempts to stay present in
+  * ZooKeeper, even through connection and session interruptions.
+  * </p>
+  * <p>
+  * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+  * </p>
+  */
+ public class PersistentNode implements Closeable
+ {
+     private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+     private final Logger log = LoggerFactory.getLogger(getClass());
 -    private final CuratorFramework client;
++    private final WatcherRemoveCuratorFramework client;
+     private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+     private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+     private final String basePath;
+     private final CreateMode mode;
+     private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+     private final AtomicBoolean authFailure = new AtomicBoolean(false);
+     private final BackgroundCallback backgroundCallback;
+     private final boolean useProtection;
+     private final CuratorWatcher watcher = new CuratorWatcher()
+     {
+         @Override
+         public void process(WatchedEvent event) throws Exception
+         {
 -            if ( event.getType() == EventType.NodeDeleted )
++            if ( isActive() )
+             {
 -                createNode();
 -            }
 -            else if ( event.getType() == EventType.NodeDataChanged )
 -            {
 -                watchNode();
++                if ( event.getType() == EventType.NodeDeleted )
++                {
++                    createNode();
++                }
++                else if ( event.getType() == EventType.NodeDataChanged )
++                {
++                    watchNode();
++                }
+             }
+         }
+     };
++
+     private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+     {
+         @Override
 -        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++        public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+         {
 -            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++            if ( isActive() )
+             {
 -                createNode();
++                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++                {
++                    createNode();
++                }
++                else
++                {
++                    boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
++                    if ( isEphemeral != mode.isEphemeral() )
++                    {
++                        log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
++                    }
++                }
+             }
+             else
+             {
 -                boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
 -                if ( isEphemeral != mode.isEphemeral() )
 -                {
 -                    log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
 -                }
++                client.removeWatchers();
+             }
+         }
+     };
+     private final BackgroundCallback setDataCallback = new BackgroundCallback()
+     {
+ 
+         @Override
 -        public void processResult(CuratorFramework client, CuratorEvent event)
++        public void processResult(CuratorFramework dummy, CuratorEvent event)
+             throws Exception
+         {
+             //If the result is ok then initialisation is complete (if we're still initialising)
+             //Don't retry on other errors as the only recoverable cases will be connection loss
+             //and the node not existing, both of which are already handled by other watches.
+             if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+             {
+                 //Update is ok, mark initialisation as complete if required.
+                 initialisationComplete();
+             }
+         }
+     };
+     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+     {
+         @Override
 -        public void stateChanged(CuratorFramework client, ConnectionState newState)
++        public void stateChanged(CuratorFramework dummy, ConnectionState newState)
+         {
 -            if ( newState == ConnectionState.RECONNECTED )
++            if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+             {
+                 createNode();
+             }
+         }
+     };
+ 
++    @VisibleForTesting
++    volatile CountDownLatch debugCreateNodeLatch = null;
++
+     private enum State
+     {
+         LATENT,
+         STARTED,
+         CLOSED
+     }
+ 
+     /**
 -     * @param client        client instance
++     * @param givenClient        client instance
+      * @param mode          creation mode
+      * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+      * @param basePath the base path for the node
+      * @param initData data for the node
+      */
 -    public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
++    public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+     {
+         this.useProtection = useProtection;
 -        this.client = Preconditions.checkNotNull(client, "client cannot be null");
++        this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework();
+         this.basePath = PathUtils.validatePath(basePath);
+         this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+         final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+ 
+         backgroundCallback = new BackgroundCallback()
+         {
+             @Override
 -            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++            public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+             {
++                if ( !isActive() )
++                {
++                    return;
++                }
++
+                 String path = null;
+                 boolean nodeExists = false;
+                 if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+                 {
+                     path = event.getPath();
+                     nodeExists = true;
+                 }
+                 else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                 {
+                     path = event.getName();
+                 }
+                 else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+                 {
 -                    log.warn("Client does not have authorisation to write node at path {}", event.getPath());
++                    log.warn("Client does not have authorization to write node at path {}", event.getPath());
+                     authFailure.set(true);
+                     return;
+                 }
+                 if ( path != null )
+                 {
+                     authFailure.set(false);
+                     nodePath.set(path);
+                     watchNode();
+ 
+                     if ( nodeExists )
+                     {
+                         client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+                     }
+                     else
+                     {
+                         initialisationComplete();
+                     }
+                 }
+                 else
+                 {
+                     createNode();
+                 }
+             }
+         };
+ 
+         createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+         this.data.set(Arrays.copyOf(data, data.length));
+     }
+ 
+     private void initialisationComplete()
+     {
+         CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+         if ( localLatch != null )
+         {
+             localLatch.countDown();
+         }
+     }
+ 
+     /**
+      * You must call start() to initiate the persistent node. An attempt to create the node
+      * in the background will be started
+      */
+     public void start()
+     {
+         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ 
+         client.getConnectionStateListenable().addListener(connectionStateListener);
+         createNode();
+     }
+ 
+     /**
+      * Block until the either initial node creation initiated by {@link #start()} succeeds or
+      * the timeout elapses.
+      *
+      * @param timeout the maximum time to wait
+      * @param unit    time unit
+      * @return if the node was created before timeout
+      * @throws InterruptedException if the thread is interrupted
+      */
+     public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+     {
+         Preconditions.checkState(state.get() == State.STARTED, "Not started");
+ 
+         CountDownLatch localLatch = initialCreateLatch.get();
+         return (localLatch == null) || localLatch.await(timeout, unit);
+     }
+ 
++    @VisibleForTesting
++    final AtomicLong debugWaitMsForBackgroundBeforeClose = new AtomicLong(0);
++
+     @Override
+     public void close() throws IOException
+     {
++        if ( debugWaitMsForBackgroundBeforeClose.get() > 0 )
++        {
++            try
++            {
++                Thread.sleep(debugWaitMsForBackgroundBeforeClose.get());
++            }
++            catch ( InterruptedException e )
++            {
++                Thread.currentThread().interrupt();
++            }
++        }
++
+         if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+         {
+             return;
+         }
+ 
+         client.getConnectionStateListenable().removeListener(connectionStateListener);
+ 
+         try
+         {
+             deleteNode();
+         }
+         catch ( Exception e )
+         {
+             ThreadUtils.checkInterrupted(e);
+             throw new IOException(e);
+         }
++
++        client.removeWatchers();
+     }
+ 
+     /**
+      * Returns the currently set path or null if the node does not exist
+      *
+      * @return node path or null
+      */
+     public String getActualPath()
+     {
+         return nodePath.get();
+     }
+ 
+     /**
+      * Set data that node should set in ZK also writes the data to the node
+      *
+      * @param data new data value
+      * @throws Exception errors
+      */
+     public void setData(byte[] data) throws Exception
+     {
+         data = Preconditions.checkNotNull(data, "data cannot be null");
+         this.data.set(Arrays.copyOf(data, data.length));
+         if ( isActive() )
+         {
+             client.setData().inBackground().forPath(getActualPath(), getData());
+         }
+     }
+ 
+     /**
+      * Return the current value of our data
+      *
+      * @return our data
+      */
+     public byte[] getData()
+     {
+         return this.data.get();
+     }
+ 
+     private void deleteNode() throws Exception
+     {
+         String localNodePath = nodePath.getAndSet(null);
+         if ( localNodePath != null )
+         {
+             try
+             {
+                 client.delete().guaranteed().forPath(localNodePath);
+             }
+             catch ( KeeperException.NoNodeException ignore )
+             {
+                 // ignore
+             }
+         }
+     }
+ 
+     private void createNode()
+     {
+         if ( !isActive() )
+         {
+             return;
+         }
+ 
++        if ( debugCreateNodeLatch != null )
++        {
++            try
++            {
++                debugCreateNodeLatch.await();
++            }
++            catch ( InterruptedException e )
++            {
++                Thread.currentThread().interrupt();
++                return;
++            }
++        }
++
+         try
+         {
+             String existingPath = nodePath.get();
+             String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+             createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+         }
+         catch ( Exception e )
+         {
+             ThreadUtils.checkInterrupted(e);
+             throw new RuntimeException("Creating node. BasePath: " + basePath, e);  // should never happen unless there's a programming error - so throw RuntimeException
+         }
+     }
+ 
+     private CreateMode getCreateMode(boolean pathIsSet)
+     {
+         if ( pathIsSet )
+         {
+             switch ( mode )
+             {
+             default:
+             {
+                 break;
+             }
+ 
+             case EPHEMERAL_SEQUENTIAL:
+             {
+                 return CreateMode.EPHEMERAL;    // protection case - node already set
+             }
+ 
+             case PERSISTENT_SEQUENTIAL:
+             {
+                 return CreateMode.PERSISTENT;    // protection case - node already set
+             }
+             }
+         }
+         return mode;
+     }
+ 
+     private void watchNode() throws Exception
+     {
+         if ( !isActive() )
+         {
+             return;
+         }
+ 
+         String localNodePath = nodePath.get();
+         if ( localNodePath != null )
+         {
+             client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+         }
+     }
+ 
+     private boolean isActive()
+     {
+         return (state.get() == State.STARTED);
+     }
+ 
+     @VisibleForTesting
+     boolean isAuthFailure()
+     {
+         return authFailure.get();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 2fb6c66,f451feb..15c5f2e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@@ -37,12 -35,10 +37,12 @@@ import org.apache.curator.utils.ZKPaths
  import org.apache.zookeeper.CreateMode;
  import org.apache.zookeeper.WatchedEvent;
  import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooDefs;
  import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.apache.zookeeper.ZooDefs;
  import org.apache.zookeeper.data.ACL;
  import org.apache.zookeeper.data.Stat;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  import org.testng.Assert;
  import org.testng.annotations.AfterMethod;
  import org.testng.annotations.Test;
@@@ -58,9 -52,9 +57,10 @@@ import java.util.concurrent.TimeUnit
  
  import static org.testng.Assert.*;
  
+ @SuppressWarnings("deprecation")
  public class TestPersistentEphemeralNode extends BaseClassForTests
  {
 +    private static final Logger log = LoggerFactory.getLogger(TestPersistentEphemeralNode.class);
      private static final String DIR = "/test";
      private static final String PATH = ZKPaths.makePath(DIR, "/foo");
  
@@@ -100,39 -88,37 +100,40 @@@
          try
          {
              client.start();
 -            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
 -            node.start();
 -
 -            final CountDownLatch connectedLatch = new CountDownLatch(1);
 -            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
 -            ConnectionStateListener listener = new ConnectionStateListener()
 +            try ( PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes()) )
              {
 -                @Override
 -                public void stateChanged(CuratorFramework client, ConnectionState newState)
++                node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +                node.start();
 +
 +                final CountDownLatch connectedLatch = new CountDownLatch(1);
 +                final CountDownLatch reconnectedLatch = new CountDownLatch(1);
 +                ConnectionStateListener listener = new ConnectionStateListener()
                  {
 -                    if ( newState == ConnectionState.CONNECTED )
 -                    {
 -                        connectedLatch.countDown();
 -                    }
 -                    if ( newState == ConnectionState.RECONNECTED )
 +                    @Override
 +                    public void stateChanged(CuratorFramework client, ConnectionState newState)
                      {
 -                        reconnectedLatch.countDown();
 +                        if ( newState == ConnectionState.CONNECTED )
 +                        {
 +                            connectedLatch.countDown();
 +                        }
 +                        if ( newState == ConnectionState.RECONNECTED )
 +                        {
 +                            reconnectedLatch.countDown();
 +                        }
                      }
 -                }
 -            };
 -            client.getConnectionStateListenable().addListener(listener);
 -            timing.sleepABit();
 -            server.restart();
 -            Assert.assertTrue(timing.awaitLatch(connectedLatch));
 -            timing.sleepABit();
 -            Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
 -            server.stop();
 -            timing.sleepABit();
 -            server.restart();
 -            timing.sleepABit();
 -            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
 +                };
 +                client.getConnectionStateListenable().addListener(listener);
 +                timing.sleepABit();
 +                server.restart();
 +                Assert.assertTrue(timing.awaitLatch(connectedLatch));
 +                timing.sleepABit();
 +                Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
 +                server.stop();
 +                timing.sleepABit();
 +                server.restart();
 +                timing.sleepABit();
 +                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
 +            }
          }
          finally
          {
@@@ -150,7 -135,7 +151,8 @@@
          try
          {
              client.start();
 -            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
 +            node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
              node.start();
  
              final CountDownLatch connectedLatch = new CountDownLatch(1);
@@@ -231,6 -215,6 +233,7 @@@
          {
              client.start();
              node = new PersistentEphemeralNode(client, mode, PATH, "a".getBytes());
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
              node.start();
              Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS));
  
@@@ -270,6 -257,6 +273,7 @@@
          CuratorFramework curator = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          node.start();
          String path = null;
          try
@@@ -292,6 -279,6 +296,7 @@@
          CuratorFramework curator = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          node.start();
          node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
  
@@@ -310,9 -297,9 +315,10 @@@
          CuratorFramework observer = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertNodeExists(observer, node.getActualPath());
  
@@@ -340,9 -325,9 +346,10 @@@
          CuratorFramework observer = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(5, TimeUnit.SECONDS);
              assertNodeExists(observer, node.getActualPath());
  
@@@ -374,9 -357,9 +381,10 @@@
          CuratorFramework observer = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              String path = node.getActualPath();
              assertNodeExists(observer, path);
@@@ -421,6 -401,6 +429,7 @@@
          observer.getData().usingWatcher(dataChangedTrigger).forPath(PATH);
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          node.start();
          try
          {
@@@ -454,9 -434,37 +463,10 @@@
          CuratorFramework curator = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
 -        try
 -        {
 -            node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
 -            String originalNode = node.getActualPath();
 -            assertNodeExists(curator, originalNode);
 -
 -            // Delete the original node...
 -            curator.delete().forPath(originalNode);
 -
 -            // Since we're using an ephemeral node, and the original session hasn't been interrupted the name of the new
 -            // node that gets created is going to be exactly the same as the original.
 -            Trigger createdWatchTrigger = Trigger.created();
 -            Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
 -            assertTrue(stat != null || createdWatchTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
 -        }
 -        finally
 -        {
 -            node.close();
 -        }
 -    }
 -
 -    @Test
 -    public void testRecreatesNodeWhenItGetsDeletedAfterSetData() throws Exception
 -    {
 -        CuratorFramework curator = newCurator();
 -
 -        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              String originalNode = node.getActualPath();
              assertNodeExists(curator, originalNode);
@@@ -481,13 -498,14 +491,15 @@@
      {
          CuratorFramework curator = newCurator();
  
 -        PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
 -        node1.start();
 -        try
 +        try ( PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]) )
          {
++            node1.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node1.start();
              node1.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              String path1 = node1.getActualPath();
  
              PersistentEphemeralNode node2 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
++            node2.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
              node2.start();
              try
              {
@@@ -510,9 -532,9 +522,10 @@@
          byte[] data = "Hello World".getBytes();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
          }
@@@ -536,9 -558,9 +549,10 @@@
          byte[] data = "Hello World".getBytes();
               
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
          }
@@@ -557,9 -579,9 +571,10 @@@
          byte[] updatedData = "Updated".getBytes();
               
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
 -        node.start();
          try
          {
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
              
@@@ -603,9 -625,9 +618,10 @@@
          byte[] updatedData = "Updated".getBytes();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
 -        node.start();
          try
          {
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
  
@@@ -646,9 -668,9 +662,10 @@@
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.PROTECTED_EPHEMERAL, PATH,
                                                                     new byte[0]);
 -        node.start();
          try
          {
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertNodeExists(curator, node.getActualPath());
  
@@@ -693,6 -714,6 +710,7 @@@
          
          	node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, PATH,
                                                                     new byte[0]);
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          	node.start();
          
              node.waitForInitialCreate(timing.seconds(), TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
index 0000000,c006dd7..20d6916
mode 000000,100644..100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
@@@ -1,0 -1,62 +1,63 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.framework.recipes.nodes;
+ 
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.BaseClassForTests;
+ import org.apache.curator.test.Timing;
+ import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ import java.util.concurrent.TimeUnit;
+ 
+ public class TestPersistentNode extends BaseClassForTests
+ {
+     @Test
+     public void testBasic() throws Exception
+     {
+         final byte[] TEST_DATA = "hey".getBytes();
+ 
+         Timing timing = new Timing();
+         PersistentNode pen = null;
+         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+         try
+         {
+             client.start();
+             pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
++            pen.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+             pen.start();
+             Assert.assertTrue(pen.waitForInitialCreate(timing.milliseconds(), TimeUnit.MILLISECONDS));
+             client.close(); // cause session to end - force checks that node is persistent
+ 
+             client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+             client.start();
+ 
+             byte[] bytes = client.getData().forPath("/test");
+             Assert.assertEquals(bytes, TEST_DATA);
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(pen);
+             CloseableUtils.closeQuietly(client);
+         }
+     }
+ }


[3/5] curator git commit: fixed docs/comments that refer to ephemeral

Posted by ra...@apache.org.
fixed docs/comments that refer to ephemeral


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/55ca08cb
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/55ca08cb
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/55ca08cb

Branch: refs/heads/CURATOR-3.0
Commit: 55ca08cb1aef3deada6ae6bb128c6fb2f0608777
Parents: e657cf6
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 18:11:45 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 18:11:45 2016 -0500

----------------------------------------------------------------------
 .../apache/curator/framework/recipes/nodes/PersistentNode.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/55ca08cb/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index fbeaeff..cf0bf38 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -171,7 +171,7 @@ public class PersistentNode implements Closeable
                 }
                 else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
                 {
-                    log.warn("Client does not have authorisation to write ephemeral node at path {}", event.getPath());
+                    log.warn("Client does not have authorisation to write node at path {}", event.getPath());
                     authFailure.set(true);
                     return;
                 }
@@ -211,7 +211,7 @@ public class PersistentNode implements Closeable
     }
 
     /**
-     * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
+     * You must call start() to initiate the persistent node. An attempt to create the node
      * in the background will be started
      */
     public void start()
@@ -270,7 +270,7 @@ public class PersistentNode implements Closeable
     }
 
     /**
-     * Set data that ephemeral node should set in ZK also writes the data to the node
+     * Set data that node should set in ZK also writes the data to the node
      *
      * @param data new data value
      * @throws Exception errors


[4/5] curator git commit: Merge branch 'master' into CURATOR-287

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-287

Conflicts:
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/649e0ba2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/649e0ba2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/649e0ba2

Branch: refs/heads/CURATOR-3.0
Commit: 649e0ba24b4123829e1267c755318a288f6f9e0c
Parents: 55ca08c b08b543
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 19:10:14 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 19:10:14 2016 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  4 ++
 .../apache/curator/CuratorZookeeperClient.java  |  2 +
 .../main/java/org/apache/curator/RetryLoop.java |  2 +
 .../apache/curator/SessionFailRetryLoop.java    |  2 +
 .../exhibitor/ExhibitorEnsembleProvider.java    |  2 +
 .../org/apache/curator/utils/ThreadUtils.java   | 23 +++++++
 .../java/org/apache/curator/utils/ZKPaths.java  |  2 +-
 .../src/main/java/locking/LockingExample.java   |  7 ++-
 .../framework/api/VersionPathAndBytesable.java  | 25 ++++++++
 .../transaction/TransactionCreateBuilder.java   |  5 +-
 .../transaction/TransactionSetDataBuilder.java  |  4 +-
 .../curator/framework/imps/Backgrounding.java   |  2 +
 .../framework/imps/CreateBuilderImpl.java       | 15 +++--
 .../framework/imps/CuratorFrameworkImpl.java    | 46 +++++++++-----
 .../framework/imps/DeleteBuilderImpl.java       |  2 +
 .../framework/imps/FailedDeleteManager.java     |  5 +-
 .../FindAndDeleteProtectedNodeInBackground.java |  3 +
 .../framework/imps/GetDataBuilderImpl.java      |  2 +
 .../curator/framework/imps/NamespaceImpl.java   |  2 +
 .../framework/imps/NamespaceWatcher.java        |  2 +
 .../framework/imps/OperationAndData.java        | 11 +++-
 .../framework/imps/SetDataBuilderImpl.java      |  8 +--
 .../framework/listen/ListenerContainer.java     |  2 +
 .../framework/state/ConnectionStateManager.java | 14 +++--
 .../framework/imps/TestTransactions.java        | 46 +++++++++++++-
 .../recipes/AfterConnectionEstablished.java     |  1 +
 .../recipes/cache/DefaultTreeCacheSelector.java | 37 +++++++++++
 .../framework/recipes/cache/NodeCache.java      |  4 ++
 .../recipes/cache/PathChildrenCache.java        |  4 ++
 .../framework/recipes/cache/TreeCache.java      | 45 +++++++++----
 .../recipes/cache/TreeCacheSelector.java        | 66 ++++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   |  5 ++
 .../recipes/leader/LeaderSelector.java          | 10 ++-
 .../framework/recipes/locks/ChildReaper.java    |  1 +
 .../recipes/locks/InterProcessMultiLock.java    |  4 ++
 .../recipes/locks/InterProcessSemaphore.java    |  4 ++
 .../recipes/locks/InterProcessSemaphoreV2.java  |  2 +
 .../framework/recipes/locks/LockInternals.java  |  2 +
 .../curator/framework/recipes/locks/Reaper.java |  1 +
 .../framework/recipes/nodes/GroupMember.java    |  3 +
 .../framework/recipes/nodes/PersistentNode.java |  3 +
 .../recipes/queue/DistributedQueue.java         | 43 ++++++++-----
 .../framework/recipes/queue/QueueSharder.java   | 16 +++--
 .../framework/recipes/shared/SharedValue.java   |  2 +
 ...estResetConnectionWithBackgroundFailure.java | 36 +++++------
 .../framework/recipes/cache/TestTreeCache.java  | 57 +++++++++++++++--
 .../framework/recipes/locks/TestLockACLs.java   | 50 ++++++++++++---
 .../curator/test/TestingZooKeeperMain.java      | 31 ++++++++-
 .../entity/JsonServiceInstanceMarshaller.java   |  3 +
 .../entity/JsonServiceInstancesMarshaller.java  |  2 +
 .../server/rest/DiscoveryResource.java          |  6 ++
 .../discovery/server/rest/InstanceCleanup.java  |  2 +
 .../discovery/details/ServiceDiscoveryImpl.java |  3 +
 .../x/rpc/idl/discovery/DiscoveryService.java   |  8 +++
 .../idl/discovery/DiscoveryServiceLowLevel.java |  7 +++
 .../idl/services/CuratorProjectionService.java  | 25 ++++++++
 56 files changed, 606 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/649e0ba2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index cf0bf38,0000000..0d7ab9d
mode 100644,000000..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,382 -1,0 +1,385 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.curator.framework.recipes.nodes;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import org.apache.curator.framework.CuratorFramework;
 +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 +import org.apache.curator.framework.api.BackgroundCallback;
 +import org.apache.curator.framework.api.CreateBuilder;
 +import org.apache.curator.framework.api.CreateModable;
 +import org.apache.curator.framework.api.CuratorEvent;
 +import org.apache.curator.framework.api.CuratorWatcher;
 +import org.apache.curator.framework.state.ConnectionState;
 +import org.apache.curator.framework.state.ConnectionStateListener;
 +import org.apache.curator.utils.PathUtils;
++import org.apache.curator.utils.ThreadUtils;
 +import org.apache.zookeeper.CreateMode;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher.Event.EventType;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import java.io.Closeable;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +/**
 + * <p>
 + * A persistent node is a node that attempts to stay present in
 + * ZooKeeper, even through connection and session interruptions.
 + * </p>
 + * <p>
 + * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
 + * </p>
 + */
 +public class PersistentNode implements Closeable
 +{
 +    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
 +    private final Logger log = LoggerFactory.getLogger(getClass());
 +    private final CuratorFramework client;
 +    private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
 +    private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
 +    private final String basePath;
 +    private final CreateMode mode;
 +    private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
 +    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
 +    private final AtomicBoolean authFailure = new AtomicBoolean(false);
 +    private final BackgroundCallback backgroundCallback;
 +    private final boolean useProtection;
 +    private final CuratorWatcher watcher = new CuratorWatcher()
 +    {
 +        @Override
 +        public void process(WatchedEvent event) throws Exception
 +        {
 +            if ( event.getType() == EventType.NodeDeleted )
 +            {
 +                createNode();
 +            }
 +            else if ( event.getType() == EventType.NodeDataChanged )
 +            {
 +                watchNode();
 +            }
 +        }
 +    };
 +    private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
 +    {
 +        @Override
 +        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
 +        {
 +            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
 +            {
 +                createNode();
 +            }
 +            else
 +            {
 +                boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
 +                if ( isEphemeral != mode.isEphemeral() )
 +                {
 +                    log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
 +                }
 +            }
 +        }
 +    };
 +    private final BackgroundCallback setDataCallback = new BackgroundCallback()
 +    {
 +
 +        @Override
 +        public void processResult(CuratorFramework client, CuratorEvent event)
 +            throws Exception
 +        {
 +            //If the result is ok then initialisation is complete (if we're still initialising)
 +            //Don't retry on other errors as the only recoverable cases will be connection loss
 +            //and the node not existing, both of which are already handled by other watches.
 +            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
 +            {
 +                //Update is ok, mark initialisation as complete if required.
 +                initialisationComplete();
 +            }
 +        }
 +    };
 +    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
 +    {
 +        @Override
 +        public void stateChanged(CuratorFramework client, ConnectionState newState)
 +        {
 +            if ( newState == ConnectionState.RECONNECTED )
 +            {
 +                createNode();
 +            }
 +        }
 +    };
 +
 +    private enum State
 +    {
 +        LATENT,
 +        STARTED,
 +        CLOSED
 +    }
 +
 +    /**
 +     * @param client        client instance
 +     * @param mode          creation mode
 +     * @param useProtection if true, call {@link CreateBuilder#withProtection()}
 +     * @param basePath the base path for the node
 +     * @param initData data for the node
 +     */
 +    public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
 +    {
 +        this.useProtection = useProtection;
 +        this.client = Preconditions.checkNotNull(client, "client cannot be null");
 +        this.basePath = PathUtils.validatePath(basePath);
 +        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 +        final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
 +
 +        backgroundCallback = new BackgroundCallback()
 +        {
 +            @Override
 +            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
 +            {
 +                String path = null;
 +                boolean nodeExists = false;
 +                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
 +                {
 +                    path = event.getPath();
 +                    nodeExists = true;
 +                }
 +                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
 +                {
 +                    path = event.getName();
 +                }
 +                else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
 +                {
 +                    log.warn("Client does not have authorisation to write node at path {}", event.getPath());
 +                    authFailure.set(true);
 +                    return;
 +                }
 +                if ( path != null )
 +                {
 +                    authFailure.set(false);
 +                    nodePath.set(path);
 +                    watchNode();
 +
 +                    if ( nodeExists )
 +                    {
 +                        client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
 +                    }
 +                    else
 +                    {
 +                        initialisationComplete();
 +                    }
 +                }
 +                else
 +                {
 +                    createNode();
 +                }
 +            }
 +        };
 +
 +        createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
 +        this.data.set(Arrays.copyOf(data, data.length));
 +    }
 +
 +    private void initialisationComplete()
 +    {
 +        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
 +        if ( localLatch != null )
 +        {
 +            localLatch.countDown();
 +        }
 +    }
 +
 +    /**
 +     * You must call start() to initiate the persistent node. An attempt to create the node
 +     * in the background will be started
 +     */
 +    public void start()
 +    {
 +        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
 +
 +        client.getConnectionStateListenable().addListener(connectionStateListener);
 +        createNode();
 +    }
 +
 +    /**
 +     * Block until the either initial node creation initiated by {@link #start()} succeeds or
 +     * the timeout elapses.
 +     *
 +     * @param timeout the maximum time to wait
 +     * @param unit    time unit
 +     * @return if the node was created before timeout
 +     * @throws InterruptedException if the thread is interrupted
 +     */
 +    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
 +    {
 +        Preconditions.checkState(state.get() == State.STARTED, "Not started");
 +
 +        CountDownLatch localLatch = initialCreateLatch.get();
 +        return (localLatch == null) || localLatch.await(timeout, unit);
 +    }
 +
 +    @Override
 +    public void close() throws IOException
 +    {
 +        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
 +        {
 +            return;
 +        }
 +
 +        client.getConnectionStateListenable().removeListener(connectionStateListener);
 +
 +        try
 +        {
 +            deleteNode();
 +        }
 +        catch ( Exception e )
 +        {
++            ThreadUtils.checkInterrupted(e);
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    /**
 +     * Returns the currently set path or null if the node does not exist
 +     *
 +     * @return node path or null
 +     */
 +    public String getActualPath()
 +    {
 +        return nodePath.get();
 +    }
 +
 +    /**
 +     * Set data that node should set in ZK also writes the data to the node
 +     *
 +     * @param data new data value
 +     * @throws Exception errors
 +     */
 +    public void setData(byte[] data) throws Exception
 +    {
 +        data = Preconditions.checkNotNull(data, "data cannot be null");
 +        this.data.set(Arrays.copyOf(data, data.length));
 +        if ( isActive() )
 +        {
 +            client.setData().inBackground().forPath(getActualPath(), getData());
 +        }
 +    }
 +
 +    /**
 +     * Return the current value of our data
 +     *
 +     * @return our data
 +     */
 +    public byte[] getData()
 +    {
 +        return this.data.get();
 +    }
 +
 +    private void deleteNode() throws Exception
 +    {
 +        String localNodePath = nodePath.getAndSet(null);
 +        if ( localNodePath != null )
 +        {
 +            try
 +            {
 +                client.delete().guaranteed().forPath(localNodePath);
 +            }
 +            catch ( KeeperException.NoNodeException ignore )
 +            {
 +                // ignore
 +            }
 +        }
 +    }
 +
 +    private void createNode()
 +    {
 +        if ( !isActive() )
 +        {
 +            return;
 +        }
 +
 +        try
 +        {
 +            String existingPath = nodePath.get();
 +            String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
 +            createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
 +        }
 +        catch ( Exception e )
 +        {
++            ThreadUtils.checkInterrupted(e);
 +            throw new RuntimeException("Creating node. BasePath: " + basePath, e);  // should never happen unless there's a programming error - so throw RuntimeException
 +        }
 +    }
 +
 +    private CreateMode getCreateMode(boolean pathIsSet)
 +    {
 +        if ( pathIsSet )
 +        {
 +            switch ( mode )
 +            {
 +            default:
 +            {
 +                break;
 +            }
 +
 +            case EPHEMERAL_SEQUENTIAL:
 +            {
 +                return CreateMode.EPHEMERAL;    // protection case - node already set
 +            }
 +
 +            case PERSISTENT_SEQUENTIAL:
 +            {
 +                return CreateMode.PERSISTENT;    // protection case - node already set
 +            }
 +            }
 +        }
 +        return mode;
 +    }
 +
 +    private void watchNode() throws Exception
 +    {
 +        if ( !isActive() )
 +        {
 +            return;
 +        }
 +
 +        String localNodePath = nodePath.get();
 +        if ( localNodePath != null )
 +        {
 +            client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
 +        }
 +    }
 +
 +    private boolean isActive()
 +    {
 +        return (state.get() == State.STARTED);
 +    }
 +
 +    @VisibleForTesting
 +    boolean isAuthFailure()
 +    {
 +        return authFailure.get();
 +    }
 +}