You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/19 03:05:55 UTC

[5/5] curator git commit: Merge branch 'master' into CURATOR-3.0

Merge branch 'master' into CURATOR-3.0

Conflicts:
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c6a22ba5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c6a22ba5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c6a22ba5

Branch: refs/heads/CURATOR-3.0
Commit: c6a22ba508f9227fe1c657f93e3cc77d8bc17e3e
Parents: d26c38d 649e0ba
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 21:05:41 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 21:05:41 2016 -0500

----------------------------------------------------------------------
 .../framework/imps/NamespaceWatcherMap.java     |  14 +-
 .../framework/imps/WatcherRemovalFacade.java    |   2 +-
 .../framework/imps/WatcherRemovalManager.java   |   3 +-
 .../recipes/nodes/PersistentEphemeralNode.java  | 332 +-------------
 .../framework/recipes/nodes/PersistentNode.java | 436 +++++++++++++++++++
 .../src/site/confluence/group-member.confluence |   2 +-
 .../persistent-ephemeral-node.confluence        |  20 +-
 .../nodes/TestPersistentEphemeralNode.java      |  22 +-
 .../TestPersistentEphemeralNodeListener.java    |   1 +
 .../recipes/nodes/TestPersistentNode.java       |  63 +++
 10 files changed, 555 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index 00618e6,e5aecb2..c864f44
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@@ -19,6 -19,6 +19,7 @@@
  package org.apache.curator.framework.imps;
  
  import com.google.common.annotations.VisibleForTesting;
++import com.google.common.cache.Cache;
  import com.google.common.cache.CacheBuilder;
  import org.apache.curator.framework.api.CuratorWatcher;
  import org.apache.zookeeper.Watcher;
@@@ -28,10 -28,10 +29,10 @@@ import java.util.concurrent.ConcurrentM
  
  class NamespaceWatcherMap implements Closeable
  {
--    private final ConcurrentMap<Object, NamespaceWatcher> map = CacheBuilder.newBuilder()
++    private final Cache<Object, NamespaceWatcher> cache = CacheBuilder.newBuilder()
          .weakValues()
--        .<Object, NamespaceWatcher>build()
--        .asMap();
++        .<Object, NamespaceWatcher>build();
++    private final ConcurrentMap<Object, NamespaceWatcher> map = cache.asMap();
      private final CuratorFrameworkImpl client;
  
      NamespaceWatcherMap(CuratorFrameworkImpl client)
@@@ -85,6 -74,6 +86,7 @@@
      @VisibleForTesting
      boolean isEmpty()
      {
++        cache.cleanUp();
          return map.isEmpty();
      }
  
@@@ -103,4 -92,4 +105,10 @@@
          NamespaceWatcher        existingNamespaceWatcher = map.putIfAbsent(watcher, newNamespaceWatcher);
          return (existingNamespaceWatcher != null) ? existingNamespaceWatcher : newNamespaceWatcher;
      }
++
++    @Override
++    public String toString()
++    {
++        return map.toString();
++    }
  }

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 91530b4,0000000..30a992e
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@@ -1,198 -1,0 +1,198 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.curator.framework.imps;
 +
 +import org.apache.curator.CuratorZookeeperClient;
 +import org.apache.curator.RetryLoop;
 +import org.apache.curator.framework.CuratorFramework;
 +import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 +import org.apache.curator.framework.api.CuratorEvent;
 +import org.apache.curator.framework.api.CuratorListener;
 +import org.apache.curator.framework.api.UnhandledErrorListener;
 +import org.apache.curator.framework.listen.Listenable;
 +import org.apache.curator.framework.state.ConnectionStateListener;
 +import org.apache.curator.utils.DebugUtils;
 +import org.apache.curator.utils.EnsurePath;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 +
 +class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
 +{
 +    private final CuratorFrameworkImpl client;
 +    private final WatcherRemovalManager removalManager;
 +
 +    WatcherRemovalFacade(CuratorFrameworkImpl client)
 +    {
 +        super(client);
 +        this.client = client;
 +        removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap());
 +    }
 +
 +    @Override
 +    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
 +    {
 +        return client.newWatcherRemoveCuratorFramework();
 +    }
 +
 +    WatcherRemovalManager getRemovalManager()
 +    {
 +        return removalManager;
 +    }
 +
 +    @Override
 +    public QuorumVerifier getCurrentConfig()
 +    {
 +        return client.getCurrentConfig();
 +    }
 +
 +    @Override
 +    public void removeWatchers()
 +    {
 +        removalManager.removeWatchers();
 +
 +        if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) )
 +        {
 +            if ( !getNamespaceWatcherMap().isEmpty() )
 +            {
-                 throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap());
++                throw new RuntimeException("NamespaceWatcherMap is not empty: " + getNamespaceWatcherMap());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    WatcherRemovalManager getWatcherRemovalManager()
 +    {
 +        return removalManager;
 +    }
 +
 +    @Override
 +    public CuratorFramework nonNamespaceView()
 +    {
 +        return client.nonNamespaceView();
 +    }
 +
 +    @Override
 +    public CuratorFramework usingNamespace(String newNamespace)
 +    {
 +        return client.usingNamespace(newNamespace);
 +    }
 +
 +    @Override
 +    public String getNamespace()
 +    {
 +        return client.getNamespace();
 +    }
 +
 +    @Override
 +    public void start()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void close()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public Listenable<ConnectionStateListener> getConnectionStateListenable()
 +    {
 +        return client.getConnectionStateListenable();
 +    }
 +
 +    @Override
 +    public Listenable<CuratorListener> getCuratorListenable()
 +    {
 +        return client.getCuratorListenable();
 +    }
 +
 +    @Override
 +    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
 +    {
 +        return client.getUnhandledErrorListenable();
 +    }
 +
 +    @Override
 +    public void sync(String path, Object context)
 +    {
 +        client.sync(path, context);
 +    }
 +
 +    @Override
 +    public CuratorZookeeperClient getZookeeperClient()
 +    {
 +        return client.getZookeeperClient();
 +    }
 +
 +    @Override
 +    RetryLoop newRetryLoop()
 +    {
 +        return client.newRetryLoop();
 +    }
 +
 +    @Override
 +    ZooKeeper getZooKeeper() throws Exception
 +    {
 +        return client.getZooKeeper();
 +    }
 +
 +    @Override
 +    <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
 +    {
 +        client.processBackgroundOperation(operationAndData, event);
 +    }
 +
 +    @Override
 +    void logError(String reason, Throwable e)
 +    {
 +        client.logError(reason, e);
 +    }
 +
 +    @Override
 +    String unfixForNamespace(String path)
 +    {
 +        return client.unfixForNamespace(path);
 +    }
 +
 +    @Override
 +    String fixForNamespace(String path)
 +    {
 +        return client.fixForNamespace(path);
 +    }
 +    
 +    @Override
 +    String fixForNamespace(String path, boolean isSequential)
 +    {
 +    	return client.fixForNamespace(path, isSequential);
 +    }
 +
 +    @Override
 +    public EnsurePath newNamespaceAwareEnsurePath(String path)
 +    {
 +        return client.newNamespaceAwareEnsurePath(path);
 +    }
 +
 +    @Override
 +    FailedDeleteManager getFailedDeleteManager()
 +    {
 +        return client.getFailedDeleteManager();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index 064964d,0000000..1e6fe94
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@@ -1,140 -1,0 +1,141 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.curator.framework.imps;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Sets;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +public class WatcherRemovalManager
 +{
 +    private final Logger log = LoggerFactory.getLogger(getClass());
 +    private final CuratorFrameworkImpl client;
 +    private final NamespaceWatcherMap namespaceWatcherMap;
 +    private final Set<WrappedWatcher> entries = Sets.newHashSet();  // guarded by sync
 +
 +    WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap)
 +    {
 +        this.client = client;
 +        this.namespaceWatcherMap = namespaceWatcherMap;
 +    }
 +
 +    synchronized Watcher add(String path, Watcher watcher)
 +    {
 +        path = Preconditions.checkNotNull(path, "path cannot be null");
 +        watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null");
 +
 +        WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path);
 +        entries.add(wrappedWatcher);
 +        return wrappedWatcher;
 +    }
 +
 +    @VisibleForTesting
 +    synchronized Set<? extends Watcher> getEntries()
 +    {
 +        return Sets.newHashSet(entries);
 +    }
 +
 +    void removeWatchers()
 +    {
 +        HashSet<WrappedWatcher> localEntries;
 +        synchronized(this)
 +        {
 +            localEntries = Sets.newHashSet(entries);
 +        }
 +        for ( WrappedWatcher entry : localEntries )
 +        {
 +            try
 +            {
 +                log.debug("Removing watcher for path: " + entry.path);
-                 namespaceWatcherMap.removeWatcher(entry.watcher);
 +                RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
++                namespaceWatcherMap.removeWatcher(entry.watcher);
 +                builder.internalRemoval(entry, entry.path);
 +            }
 +            catch ( Exception e )
 +            {
 +                log.error("Could not remove watcher for path: " + entry.path);
 +            }
 +        }
 +    }
 +
 +    private synchronized void internalRemove(WrappedWatcher entry)
 +    {
++        namespaceWatcherMap.removeWatcher(entry.watcher);
 +        entries.remove(entry);
 +    }
 +
 +    private class WrappedWatcher implements Watcher
 +    {
 +        private final Watcher watcher;
 +        private final String path;
 +
 +        WrappedWatcher(Watcher watcher, String path)
 +        {
 +            this.watcher = watcher;
 +            this.path = path;
 +        }
 +
 +        @Override
 +        public void process(WatchedEvent event)
 +        {
 +            if ( event.getType() != Event.EventType.None )
 +            {
 +                internalRemove(this);
 +            }
 +            watcher.process(event);
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if ( this == o )
 +            {
 +                return true;
 +            }
 +            if ( o == null || getClass() != o.getClass() )
 +            {
 +                return false;
 +            }
 +
 +            WrappedWatcher entry = (WrappedWatcher)o;
 +
 +            //noinspection SimplifiableIfStatement
 +            if ( !watcher.equals(entry.watcher) )
 +            {
 +                return false;
 +            }
 +            return path.equals(entry.path);
 +
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            int result = watcher.hashCode();
 +            result = 31 * result + path.hashCode();
 +            return result;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 0000000,0d7ab9d..93c88f7
mode 000000,100644..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,0 -1,385 +1,436 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.curator.framework.recipes.nodes;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import org.apache.curator.framework.CuratorFramework;
++import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+ import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CreateBuilder;
+ import org.apache.curator.framework.api.CreateModable;
+ import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+ import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import java.io.Closeable;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ /**
+  * <p>
+  * A persistent node is a node that attempts to stay present in
+  * ZooKeeper, even through connection and session interruptions.
+  * </p>
+  * <p>
+  * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+  * </p>
+  */
+ public class PersistentNode implements Closeable
+ {
+     private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+     private final Logger log = LoggerFactory.getLogger(getClass());
 -    private final CuratorFramework client;
++    private final WatcherRemoveCuratorFramework client;
+     private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+     private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+     private final String basePath;
+     private final CreateMode mode;
+     private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+     private final AtomicBoolean authFailure = new AtomicBoolean(false);
+     private final BackgroundCallback backgroundCallback;
+     private final boolean useProtection;
+     private final CuratorWatcher watcher = new CuratorWatcher()
+     {
+         @Override
+         public void process(WatchedEvent event) throws Exception
+         {
 -            if ( event.getType() == EventType.NodeDeleted )
++            if ( isActive() )
+             {
 -                createNode();
 -            }
 -            else if ( event.getType() == EventType.NodeDataChanged )
 -            {
 -                watchNode();
++                if ( event.getType() == EventType.NodeDeleted )
++                {
++                    createNode();
++                }
++                else if ( event.getType() == EventType.NodeDataChanged )
++                {
++                    watchNode();
++                }
+             }
+         }
+     };
++
+     private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+     {
+         @Override
 -        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++        public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+         {
 -            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++            if ( isActive() )
+             {
 -                createNode();
++                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++                {
++                    createNode();
++                }
++                else
++                {
++                    boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
++                    if ( isEphemeral != mode.isEphemeral() )
++                    {
++                        log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
++                    }
++                }
+             }
+             else
+             {
 -                boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
 -                if ( isEphemeral != mode.isEphemeral() )
 -                {
 -                    log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
 -                }
++                client.removeWatchers();
+             }
+         }
+     };
+     private final BackgroundCallback setDataCallback = new BackgroundCallback()
+     {
+ 
+         @Override
 -        public void processResult(CuratorFramework client, CuratorEvent event)
++        public void processResult(CuratorFramework dummy, CuratorEvent event)
+             throws Exception
+         {
+             //If the result is ok then initialisation is complete (if we're still initialising)
+             //Don't retry on other errors as the only recoverable cases will be connection loss
+             //and the node not existing, both of which are already handled by other watches.
+             if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+             {
+                 //Update is ok, mark initialisation as complete if required.
+                 initialisationComplete();
+             }
+         }
+     };
+     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+     {
+         @Override
 -        public void stateChanged(CuratorFramework client, ConnectionState newState)
++        public void stateChanged(CuratorFramework dummy, ConnectionState newState)
+         {
 -            if ( newState == ConnectionState.RECONNECTED )
++            if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+             {
+                 createNode();
+             }
+         }
+     };
+ 
++    @VisibleForTesting
++    volatile CountDownLatch debugCreateNodeLatch = null;
++
+     private enum State
+     {
+         LATENT,
+         STARTED,
+         CLOSED
+     }
+ 
+     /**
 -     * @param client        client instance
++     * @param givenClient        client instance
+      * @param mode          creation mode
+      * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+      * @param basePath the base path for the node
+      * @param initData data for the node
+      */
 -    public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
++    public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+     {
+         this.useProtection = useProtection;
 -        this.client = Preconditions.checkNotNull(client, "client cannot be null");
++        this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework();
+         this.basePath = PathUtils.validatePath(basePath);
+         this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+         final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+ 
+         backgroundCallback = new BackgroundCallback()
+         {
+             @Override
 -            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++            public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+             {
++                if ( !isActive() )
++                {
++                    return;
++                }
++
+                 String path = null;
+                 boolean nodeExists = false;
+                 if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+                 {
+                     path = event.getPath();
+                     nodeExists = true;
+                 }
+                 else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                 {
+                     path = event.getName();
+                 }
+                 else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+                 {
 -                    log.warn("Client does not have authorisation to write node at path {}", event.getPath());
++                    log.warn("Client does not have authorization to write node at path {}", event.getPath());
+                     authFailure.set(true);
+                     return;
+                 }
+                 if ( path != null )
+                 {
+                     authFailure.set(false);
+                     nodePath.set(path);
+                     watchNode();
+ 
+                     if ( nodeExists )
+                     {
+                         client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+                     }
+                     else
+                     {
+                         initialisationComplete();
+                     }
+                 }
+                 else
+                 {
+                     createNode();
+                 }
+             }
+         };
+ 
+         createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+         this.data.set(Arrays.copyOf(data, data.length));
+     }
+ 
+     private void initialisationComplete()
+     {
+         CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+         if ( localLatch != null )
+         {
+             localLatch.countDown();
+         }
+     }
+ 
+     /**
+      * You must call start() to initiate the persistent node. An attempt to create the node
+      * in the background will be started
+      */
+     public void start()
+     {
+         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ 
+         client.getConnectionStateListenable().addListener(connectionStateListener);
+         createNode();
+     }
+ 
+     /**
+      * Block until the either initial node creation initiated by {@link #start()} succeeds or
+      * the timeout elapses.
+      *
+      * @param timeout the maximum time to wait
+      * @param unit    time unit
+      * @return if the node was created before timeout
+      * @throws InterruptedException if the thread is interrupted
+      */
+     public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+     {
+         Preconditions.checkState(state.get() == State.STARTED, "Not started");
+ 
+         CountDownLatch localLatch = initialCreateLatch.get();
+         return (localLatch == null) || localLatch.await(timeout, unit);
+     }
+ 
++    @VisibleForTesting
++    final AtomicLong debugWaitMsForBackgroundBeforeClose = new AtomicLong(0);
++
+     @Override
+     public void close() throws IOException
+     {
++        if ( debugWaitMsForBackgroundBeforeClose.get() > 0 )
++        {
++            try
++            {
++                Thread.sleep(debugWaitMsForBackgroundBeforeClose.get());
++            }
++            catch ( InterruptedException e )
++            {
++                Thread.currentThread().interrupt();
++            }
++        }
++
+         if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+         {
+             return;
+         }
+ 
+         client.getConnectionStateListenable().removeListener(connectionStateListener);
+ 
+         try
+         {
+             deleteNode();
+         }
+         catch ( Exception e )
+         {
+             ThreadUtils.checkInterrupted(e);
+             throw new IOException(e);
+         }
++
++        client.removeWatchers();
+     }
+ 
+     /**
+      * Returns the currently set path or null if the node does not exist
+      *
+      * @return node path or null
+      */
+     public String getActualPath()
+     {
+         return nodePath.get();
+     }
+ 
+     /**
+      * Set data that node should set in ZK also writes the data to the node
+      *
+      * @param data new data value
+      * @throws Exception errors
+      */
+     public void setData(byte[] data) throws Exception
+     {
+         data = Preconditions.checkNotNull(data, "data cannot be null");
+         this.data.set(Arrays.copyOf(data, data.length));
+         if ( isActive() )
+         {
+             client.setData().inBackground().forPath(getActualPath(), getData());
+         }
+     }
+ 
+     /**
+      * Return the current value of our data
+      *
+      * @return our data
+      */
+     public byte[] getData()
+     {
+         return this.data.get();
+     }
+ 
+     private void deleteNode() throws Exception
+     {
+         String localNodePath = nodePath.getAndSet(null);
+         if ( localNodePath != null )
+         {
+             try
+             {
+                 client.delete().guaranteed().forPath(localNodePath);
+             }
+             catch ( KeeperException.NoNodeException ignore )
+             {
+                 // ignore
+             }
+         }
+     }
+ 
+     private void createNode()
+     {
+         if ( !isActive() )
+         {
+             return;
+         }
+ 
++        if ( debugCreateNodeLatch != null )
++        {
++            try
++            {
++                debugCreateNodeLatch.await();
++            }
++            catch ( InterruptedException e )
++            {
++                Thread.currentThread().interrupt();
++                return;
++            }
++        }
++
+         try
+         {
+             String existingPath = nodePath.get();
+             String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+             createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+         }
+         catch ( Exception e )
+         {
+             ThreadUtils.checkInterrupted(e);
+             throw new RuntimeException("Creating node. BasePath: " + basePath, e);  // should never happen unless there's a programming error - so throw RuntimeException
+         }
+     }
+ 
+     private CreateMode getCreateMode(boolean pathIsSet)
+     {
+         if ( pathIsSet )
+         {
+             switch ( mode )
+             {
+             default:
+             {
+                 break;
+             }
+ 
+             case EPHEMERAL_SEQUENTIAL:
+             {
+                 return CreateMode.EPHEMERAL;    // protection case - node already set
+             }
+ 
+             case PERSISTENT_SEQUENTIAL:
+             {
+                 return CreateMode.PERSISTENT;    // protection case - node already set
+             }
+             }
+         }
+         return mode;
+     }
+ 
+     private void watchNode() throws Exception
+     {
+         if ( !isActive() )
+         {
+             return;
+         }
+ 
+         String localNodePath = nodePath.get();
+         if ( localNodePath != null )
+         {
+             client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+         }
+     }
+ 
+     private boolean isActive()
+     {
+         return (state.get() == State.STARTED);
+     }
+ 
+     @VisibleForTesting
+     boolean isAuthFailure()
+     {
+         return authFailure.get();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 2fb6c66,f451feb..15c5f2e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@@ -37,12 -35,10 +37,12 @@@ import org.apache.curator.utils.ZKPaths
  import org.apache.zookeeper.CreateMode;
  import org.apache.zookeeper.WatchedEvent;
  import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooDefs;
  import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.apache.zookeeper.ZooDefs;
  import org.apache.zookeeper.data.ACL;
  import org.apache.zookeeper.data.Stat;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  import org.testng.Assert;
  import org.testng.annotations.AfterMethod;
  import org.testng.annotations.Test;
@@@ -58,9 -52,9 +57,10 @@@ import java.util.concurrent.TimeUnit
  
  import static org.testng.Assert.*;
  
+ @SuppressWarnings("deprecation")
  public class TestPersistentEphemeralNode extends BaseClassForTests
  {
 +    private static final Logger log = LoggerFactory.getLogger(TestPersistentEphemeralNode.class);
      private static final String DIR = "/test";
      private static final String PATH = ZKPaths.makePath(DIR, "/foo");
  
@@@ -100,39 -88,37 +100,40 @@@
          try
          {
              client.start();
 -            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
 -            node.start();
 -
 -            final CountDownLatch connectedLatch = new CountDownLatch(1);
 -            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
 -            ConnectionStateListener listener = new ConnectionStateListener()
 +            try ( PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes()) )
              {
 -                @Override
 -                public void stateChanged(CuratorFramework client, ConnectionState newState)
++                node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +                node.start();
 +
 +                final CountDownLatch connectedLatch = new CountDownLatch(1);
 +                final CountDownLatch reconnectedLatch = new CountDownLatch(1);
 +                ConnectionStateListener listener = new ConnectionStateListener()
                  {
 -                    if ( newState == ConnectionState.CONNECTED )
 -                    {
 -                        connectedLatch.countDown();
 -                    }
 -                    if ( newState == ConnectionState.RECONNECTED )
 +                    @Override
 +                    public void stateChanged(CuratorFramework client, ConnectionState newState)
                      {
 -                        reconnectedLatch.countDown();
 +                        if ( newState == ConnectionState.CONNECTED )
 +                        {
 +                            connectedLatch.countDown();
 +                        }
 +                        if ( newState == ConnectionState.RECONNECTED )
 +                        {
 +                            reconnectedLatch.countDown();
 +                        }
                      }
 -                }
 -            };
 -            client.getConnectionStateListenable().addListener(listener);
 -            timing.sleepABit();
 -            server.restart();
 -            Assert.assertTrue(timing.awaitLatch(connectedLatch));
 -            timing.sleepABit();
 -            Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
 -            server.stop();
 -            timing.sleepABit();
 -            server.restart();
 -            timing.sleepABit();
 -            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
 +                };
 +                client.getConnectionStateListenable().addListener(listener);
 +                timing.sleepABit();
 +                server.restart();
 +                Assert.assertTrue(timing.awaitLatch(connectedLatch));
 +                timing.sleepABit();
 +                Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
 +                server.stop();
 +                timing.sleepABit();
 +                server.restart();
 +                timing.sleepABit();
 +                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
 +            }
          }
          finally
          {
@@@ -150,7 -135,7 +151,8 @@@
          try
          {
              client.start();
 -            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
 +            node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
              node.start();
  
              final CountDownLatch connectedLatch = new CountDownLatch(1);
@@@ -231,6 -215,6 +233,7 @@@
          {
              client.start();
              node = new PersistentEphemeralNode(client, mode, PATH, "a".getBytes());
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
              node.start();
              Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS));
  
@@@ -270,6 -257,6 +273,7 @@@
          CuratorFramework curator = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          node.start();
          String path = null;
          try
@@@ -292,6 -279,6 +296,7 @@@
          CuratorFramework curator = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          node.start();
          node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
  
@@@ -310,9 -297,9 +315,10 @@@
          CuratorFramework observer = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertNodeExists(observer, node.getActualPath());
  
@@@ -340,9 -325,9 +346,10 @@@
          CuratorFramework observer = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(5, TimeUnit.SECONDS);
              assertNodeExists(observer, node.getActualPath());
  
@@@ -374,9 -357,9 +381,10 @@@
          CuratorFramework observer = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              String path = node.getActualPath();
              assertNodeExists(observer, path);
@@@ -421,6 -401,6 +429,7 @@@
          observer.getData().usingWatcher(dataChangedTrigger).forPath(PATH);
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          node.start();
          try
          {
@@@ -454,9 -434,37 +463,10 @@@
          CuratorFramework curator = newCurator();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
 -        try
 -        {
 -            node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
 -            String originalNode = node.getActualPath();
 -            assertNodeExists(curator, originalNode);
 -
 -            // Delete the original node...
 -            curator.delete().forPath(originalNode);
 -
 -            // Since we're using an ephemeral node, and the original session hasn't been interrupted the name of the new
 -            // node that gets created is going to be exactly the same as the original.
 -            Trigger createdWatchTrigger = Trigger.created();
 -            Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
 -            assertTrue(stat != null || createdWatchTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
 -        }
 -        finally
 -        {
 -            node.close();
 -        }
 -    }
 -
 -    @Test
 -    public void testRecreatesNodeWhenItGetsDeletedAfterSetData() throws Exception
 -    {
 -        CuratorFramework curator = newCurator();
 -
 -        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              String originalNode = node.getActualPath();
              assertNodeExists(curator, originalNode);
@@@ -481,13 -498,14 +491,15 @@@
      {
          CuratorFramework curator = newCurator();
  
 -        PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
 -        node1.start();
 -        try
 +        try ( PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]) )
          {
++            node1.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node1.start();
              node1.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              String path1 = node1.getActualPath();
  
              PersistentEphemeralNode node2 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
++            node2.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
              node2.start();
              try
              {
@@@ -510,9 -532,9 +522,10 @@@
          byte[] data = "Hello World".getBytes();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
          }
@@@ -536,9 -558,9 +549,10 @@@
          byte[] data = "Hello World".getBytes();
               
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
 -        node.start();
++        node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          try
          {
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
          }
@@@ -557,9 -579,9 +571,10 @@@
          byte[] updatedData = "Updated".getBytes();
               
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
 -        node.start();
          try
          {
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
              
@@@ -603,9 -625,9 +618,10 @@@
          byte[] updatedData = "Updated".getBytes();
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
 -        node.start();
          try
          {
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
  
@@@ -646,9 -668,9 +662,10 @@@
  
          PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.PROTECTED_EPHEMERAL, PATH,
                                                                     new byte[0]);
 -        node.start();
          try
          {
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
 +            node.start();
              node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
              assertNodeExists(curator, node.getActualPath());
  
@@@ -693,6 -714,6 +710,7 @@@
          
          	node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, PATH,
                                                                     new byte[0]);
++            node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
          	node.start();
          
              node.waitForInitialCreate(timing.seconds(), TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
index 0000000,c006dd7..20d6916
mode 000000,100644..100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
@@@ -1,0 -1,62 +1,63 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.framework.recipes.nodes;
+ 
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.BaseClassForTests;
+ import org.apache.curator.test.Timing;
+ import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ import java.util.concurrent.TimeUnit;
+ 
+ public class TestPersistentNode extends BaseClassForTests
+ {
+     @Test
+     public void testBasic() throws Exception
+     {
+         final byte[] TEST_DATA = "hey".getBytes();
+ 
+         Timing timing = new Timing();
+         PersistentNode pen = null;
+         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+         try
+         {
+             client.start();
+             pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
++            pen.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+             pen.start();
+             Assert.assertTrue(pen.waitForInitialCreate(timing.milliseconds(), TimeUnit.MILLISECONDS));
+             client.close(); // cause session to end - force checks that node is persistent
+ 
+             client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+             client.start();
+ 
+             byte[] bytes = client.getData().forPath("/test");
+             Assert.assertEquals(bytes, TEST_DATA);
+         }
+         finally
+         {
+             CloseableUtils.closeQuietly(pen);
+             CloseableUtils.closeQuietly(client);
+         }
+     }
+ }