You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/19 03:05:55 UTC
[5/5] curator git commit: Merge branch 'master' into CURATOR-3.0
Merge branch 'master' into CURATOR-3.0
Conflicts:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c6a22ba5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c6a22ba5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c6a22ba5
Branch: refs/heads/CURATOR-3.0
Commit: c6a22ba508f9227fe1c657f93e3cc77d8bc17e3e
Parents: d26c38d 649e0ba
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 21:05:41 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 21:05:41 2016 -0500
----------------------------------------------------------------------
.../framework/imps/NamespaceWatcherMap.java | 14 +-
.../framework/imps/WatcherRemovalFacade.java | 2 +-
.../framework/imps/WatcherRemovalManager.java | 3 +-
.../recipes/nodes/PersistentEphemeralNode.java | 332 +-------------
.../framework/recipes/nodes/PersistentNode.java | 436 +++++++++++++++++++
.../src/site/confluence/group-member.confluence | 2 +-
.../persistent-ephemeral-node.confluence | 20 +-
.../nodes/TestPersistentEphemeralNode.java | 22 +-
.../TestPersistentEphemeralNodeListener.java | 1 +
.../recipes/nodes/TestPersistentNode.java | 63 +++
10 files changed, 555 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index 00618e6,e5aecb2..c864f44
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@@ -19,6 -19,6 +19,7 @@@
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
++import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.Watcher;
@@@ -28,10 -28,10 +29,10 @@@ import java.util.concurrent.ConcurrentM
class NamespaceWatcherMap implements Closeable
{
-- private final ConcurrentMap<Object, NamespaceWatcher> map = CacheBuilder.newBuilder()
++ private final Cache<Object, NamespaceWatcher> cache = CacheBuilder.newBuilder()
.weakValues()
-- .<Object, NamespaceWatcher>build()
-- .asMap();
++ .<Object, NamespaceWatcher>build();
++ private final ConcurrentMap<Object, NamespaceWatcher> map = cache.asMap();
private final CuratorFrameworkImpl client;
NamespaceWatcherMap(CuratorFrameworkImpl client)
@@@ -85,6 -74,6 +86,7 @@@
@VisibleForTesting
boolean isEmpty()
{
++ cache.cleanUp();
return map.isEmpty();
}
@@@ -103,4 -92,4 +105,10 @@@
NamespaceWatcher existingNamespaceWatcher = map.putIfAbsent(watcher, newNamespaceWatcher);
return (existingNamespaceWatcher != null) ? existingNamespaceWatcher : newNamespaceWatcher;
}
++
++ @Override
++ public String toString()
++ {
++ return map.toString();
++ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 91530b4,0000000..30a992e
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@@ -1,198 -1,0 +1,198 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+
+class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
+{
+ private final CuratorFrameworkImpl client;
+ private final WatcherRemovalManager removalManager;
+
+ WatcherRemovalFacade(CuratorFrameworkImpl client)
+ {
+ super(client);
+ this.client = client;
+ removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap());
+ }
+
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
+ {
+ return client.newWatcherRemoveCuratorFramework();
+ }
+
+ WatcherRemovalManager getRemovalManager()
+ {
+ return removalManager;
+ }
+
+ @Override
+ public QuorumVerifier getCurrentConfig()
+ {
+ return client.getCurrentConfig();
+ }
+
+ @Override
+ public void removeWatchers()
+ {
+ removalManager.removeWatchers();
+
+ if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) )
+ {
+ if ( !getNamespaceWatcherMap().isEmpty() )
+ {
- throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap());
++ throw new RuntimeException("NamespaceWatcherMap is not empty: " + getNamespaceWatcherMap());
+ }
+ }
+ }
+
+ @Override
+ WatcherRemovalManager getWatcherRemovalManager()
+ {
+ return removalManager;
+ }
+
+ @Override
+ public CuratorFramework nonNamespaceView()
+ {
+ return client.nonNamespaceView();
+ }
+
+ @Override
+ public CuratorFramework usingNamespace(String newNamespace)
+ {
+ return client.usingNamespace(newNamespace);
+ }
+
+ @Override
+ public String getNamespace()
+ {
+ return client.getNamespace();
+ }
+
+ @Override
+ public void start()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Listenable<ConnectionStateListener> getConnectionStateListenable()
+ {
+ return client.getConnectionStateListenable();
+ }
+
+ @Override
+ public Listenable<CuratorListener> getCuratorListenable()
+ {
+ return client.getCuratorListenable();
+ }
+
+ @Override
+ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
+ {
+ return client.getUnhandledErrorListenable();
+ }
+
+ @Override
+ public void sync(String path, Object context)
+ {
+ client.sync(path, context);
+ }
+
+ @Override
+ public CuratorZookeeperClient getZookeeperClient()
+ {
+ return client.getZookeeperClient();
+ }
+
+ @Override
+ RetryLoop newRetryLoop()
+ {
+ return client.newRetryLoop();
+ }
+
+ @Override
+ ZooKeeper getZooKeeper() throws Exception
+ {
+ return client.getZooKeeper();
+ }
+
+ @Override
+ <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
+ {
+ client.processBackgroundOperation(operationAndData, event);
+ }
+
+ @Override
+ void logError(String reason, Throwable e)
+ {
+ client.logError(reason, e);
+ }
+
+ @Override
+ String unfixForNamespace(String path)
+ {
+ return client.unfixForNamespace(path);
+ }
+
+ @Override
+ String fixForNamespace(String path)
+ {
+ return client.fixForNamespace(path);
+ }
+
+ @Override
+ String fixForNamespace(String path, boolean isSequential)
+ {
+ return client.fixForNamespace(path, isSequential);
+ }
+
+ @Override
+ public EnsurePath newNamespaceAwareEnsurePath(String path)
+ {
+ return client.newNamespaceAwareEnsurePath(path);
+ }
+
+ @Override
+ FailedDeleteManager getFailedDeleteManager()
+ {
+ return client.getFailedDeleteManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index 064964d,0000000..1e6fe94
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@@ -1,140 -1,0 +1,141 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WatcherRemovalManager
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFrameworkImpl client;
+ private final NamespaceWatcherMap namespaceWatcherMap;
+ private final Set<WrappedWatcher> entries = Sets.newHashSet(); // guarded by sync
+
+ WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap)
+ {
+ this.client = client;
+ this.namespaceWatcherMap = namespaceWatcherMap;
+ }
+
+ synchronized Watcher add(String path, Watcher watcher)
+ {
+ path = Preconditions.checkNotNull(path, "path cannot be null");
+ watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null");
+
+ WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path);
+ entries.add(wrappedWatcher);
+ return wrappedWatcher;
+ }
+
+ @VisibleForTesting
+ synchronized Set<? extends Watcher> getEntries()
+ {
+ return Sets.newHashSet(entries);
+ }
+
+ void removeWatchers()
+ {
+ HashSet<WrappedWatcher> localEntries;
+ synchronized(this)
+ {
+ localEntries = Sets.newHashSet(entries);
+ }
+ for ( WrappedWatcher entry : localEntries )
+ {
+ try
+ {
+ log.debug("Removing watcher for path: " + entry.path);
- namespaceWatcherMap.removeWatcher(entry.watcher);
+ RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
++ namespaceWatcherMap.removeWatcher(entry.watcher);
+ builder.internalRemoval(entry, entry.path);
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not remove watcher for path: " + entry.path);
+ }
+ }
+ }
+
+ private synchronized void internalRemove(WrappedWatcher entry)
+ {
++ namespaceWatcherMap.removeWatcher(entry.watcher);
+ entries.remove(entry);
+ }
+
+ private class WrappedWatcher implements Watcher
+ {
+ private final Watcher watcher;
+ private final String path;
+
+ WrappedWatcher(Watcher watcher, String path)
+ {
+ this.watcher = watcher;
+ this.path = path;
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() != Event.EventType.None )
+ {
+ internalRemove(this);
+ }
+ watcher.process(event);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ WrappedWatcher entry = (WrappedWatcher)o;
+
+ //noinspection SimplifiableIfStatement
+ if ( !watcher.equals(entry.watcher) )
+ {
+ return false;
+ }
+ return path.equals(entry.path);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = watcher.hashCode();
+ result = 31 * result + path.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 0000000,0d7ab9d..93c88f7
mode 000000,100644..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,0 -1,385 +1,436 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.curator.framework.recipes.nodes;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import org.apache.curator.framework.CuratorFramework;
++import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+ import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CreateBuilder;
+ import org.apache.curator.framework.api.CreateModable;
+ import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+ import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import java.io.Closeable;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+
+ /**
+ * <p>
+ * A persistent node is a node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+ public class PersistentNode implements Closeable
+ {
+ private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
++ private final WatcherRemoveCuratorFramework client;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final String basePath;
+ private final CreateMode mode;
+ private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean authFailure = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final boolean useProtection;
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
- if ( event.getType() == EventType.NodeDeleted )
++ if ( isActive() )
+ {
- createNode();
- }
- else if ( event.getType() == EventType.NodeDataChanged )
- {
- watchNode();
++ if ( event.getType() == EventType.NodeDeleted )
++ {
++ createNode();
++ }
++ else if ( event.getType() == EventType.NodeDataChanged )
++ {
++ watchNode();
++ }
+ }
+ }
+ };
++
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+ {
+ @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++ public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+ {
- if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++ if ( isActive() )
+ {
- createNode();
++ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++ {
++ createNode();
++ }
++ else
++ {
++ boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
++ if ( isEphemeral != mode.isEphemeral() )
++ {
++ log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
++ }
++ }
+ }
+ else
+ {
- boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
- if ( isEphemeral != mode.isEphemeral() )
- {
- log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
- }
++ client.removeWatchers();
+ }
+ }
+ };
+ private final BackgroundCallback setDataCallback = new BackgroundCallback()
+ {
+
+ @Override
- public void processResult(CuratorFramework client, CuratorEvent event)
++ public void processResult(CuratorFramework dummy, CuratorEvent event)
+ throws Exception
+ {
+ //If the result is ok then initialisation is complete (if we're still initialising)
+ //Don't retry on other errors as the only recoverable cases will be connection loss
+ //and the node not existing, both of which are already handled by other watches.
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ //Update is ok, mark initialisation as complete if required.
+ initialisationComplete();
+ }
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
++ public void stateChanged(CuratorFramework dummy, ConnectionState newState)
+ {
- if ( newState == ConnectionState.RECONNECTED )
++ if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+ {
+ createNode();
+ }
+ }
+ };
+
++ @VisibleForTesting
++ volatile CountDownLatch debugCreateNodeLatch = null;
++
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
- * @param client client instance
++ * @param givenClient client instance
+ * @param mode creation mode
+ * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+ * @param basePath the base path for the node
+ * @param initData data for the node
+ */
- public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
++ public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+ {
+ this.useProtection = useProtection;
- this.client = Preconditions.checkNotNull(client, "client cannot be null");
++ this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework();
+ this.basePath = PathUtils.validatePath(basePath);
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++ public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+ {
++ if ( !isActive() )
++ {
++ return;
++ }
++
+ String path = null;
+ boolean nodeExists = false;
+ if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ nodeExists = true;
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+ {
- log.warn("Client does not have authorisation to write node at path {}", event.getPath());
++ log.warn("Client does not have authorization to write node at path {}", event.getPath());
+ authFailure.set(true);
+ return;
+ }
+ if ( path != null )
+ {
+ authFailure.set(false);
+ nodePath.set(path);
+ watchNode();
+
+ if ( nodeExists )
+ {
+ client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+ }
+ else
+ {
+ initialisationComplete();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+ this.data.set(Arrays.copyOf(data, data.length));
+ }
+
+ private void initialisationComplete()
+ {
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+
+ /**
+ * You must call start() to initiate the persistent node. An attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link #start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+ CountDownLatch localLatch = initialCreateLatch.get();
+ return (localLatch == null) || localLatch.await(timeout, unit);
+ }
+
++ @VisibleForTesting
++ final AtomicLong debugWaitMsForBackgroundBeforeClose = new AtomicLong(0);
++
+ @Override
+ public void close() throws IOException
+ {
++ if ( debugWaitMsForBackgroundBeforeClose.get() > 0 )
++ {
++ try
++ {
++ Thread.sleep(debugWaitMsForBackgroundBeforeClose.get());
++ }
++ catch ( InterruptedException e )
++ {
++ Thread.currentThread().interrupt();
++ }
++ }
++
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+ try
+ {
+ deleteNode();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ throw new IOException(e);
+ }
++
++ client.removeWatchers();
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ /**
+ * Set data that node should set in ZK also writes the data to the node
+ *
+ * @param data new data value
+ * @throws Exception errors
+ */
+ public void setData(byte[] data) throws Exception
+ {
+ data = Preconditions.checkNotNull(data, "data cannot be null");
+ this.data.set(Arrays.copyOf(data, data.length));
+ if ( isActive() )
+ {
+ client.setData().inBackground().forPath(getActualPath(), getData());
+ }
+ }
+
+ /**
+ * Return the current value of our data
+ *
+ * @return our data
+ */
+ public byte[] getData()
+ {
+ return this.data.get();
+ }
+
+ private void deleteNode() throws Exception
+ {
+ String localNodePath = nodePath.getAndSet(null);
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.delete().guaranteed().forPath(localNodePath);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ }
+ }
+
+ private void createNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
++ if ( debugCreateNodeLatch != null )
++ {
++ try
++ {
++ debugCreateNodeLatch.await();
++ }
++ catch ( InterruptedException e )
++ {
++ Thread.currentThread().interrupt();
++ return;
++ }
++ }
++
+ try
+ {
+ String existingPath = nodePath.get();
+ String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+ createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ throw new RuntimeException("Creating node. BasePath: " + basePath, e); // should never happen unless there's a programming error - so throw RuntimeException
+ }
+ }
+
+ private CreateMode getCreateMode(boolean pathIsSet)
+ {
+ if ( pathIsSet )
+ {
+ switch ( mode )
+ {
+ default:
+ {
+ break;
+ }
+
+ case EPHEMERAL_SEQUENTIAL:
+ {
+ return CreateMode.EPHEMERAL; // protection case - node already set
+ }
+
+ case PERSISTENT_SEQUENTIAL:
+ {
+ return CreateMode.PERSISTENT; // protection case - node already set
+ }
+ }
+ }
+ return mode;
+ }
+
+ private void watchNode() throws Exception
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ String localNodePath = nodePath.get();
+ if ( localNodePath != null )
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+ }
+ }
+
+ private boolean isActive()
+ {
+ return (state.get() == State.STARTED);
+ }
+
+ @VisibleForTesting
+ boolean isAuthFailure()
+ {
+ return authFailure.get();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 2fb6c66,f451feb..15c5f2e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@@ -37,12 -35,10 +37,12 @@@ import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@@ -58,9 -52,9 +57,10 @@@ import java.util.concurrent.TimeUnit
import static org.testng.Assert.*;
+ @SuppressWarnings("deprecation")
public class TestPersistentEphemeralNode extends BaseClassForTests
{
+ private static final Logger log = LoggerFactory.getLogger(TestPersistentEphemeralNode.class);
private static final String DIR = "/test";
private static final String PATH = ZKPaths.makePath(DIR, "/foo");
@@@ -100,39 -88,37 +100,40 @@@
try
{
client.start();
- PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
- node.start();
-
- final CountDownLatch connectedLatch = new CountDownLatch(1);
- final CountDownLatch reconnectedLatch = new CountDownLatch(1);
- ConnectionStateListener listener = new ConnectionStateListener()
+ try ( PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes()) )
{
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
+
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ ConnectionStateListener listener = new ConnectionStateListener()
{
- if ( newState == ConnectionState.CONNECTED )
- {
- connectedLatch.countDown();
- }
- if ( newState == ConnectionState.RECONNECTED )
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- reconnectedLatch.countDown();
+ if ( newState == ConnectionState.CONNECTED )
+ {
+ connectedLatch.countDown();
+ }
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectedLatch.countDown();
+ }
}
- }
- };
- client.getConnectionStateListenable().addListener(listener);
- timing.sleepABit();
- server.restart();
- Assert.assertTrue(timing.awaitLatch(connectedLatch));
- timing.sleepABit();
- Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
- server.stop();
- timing.sleepABit();
- server.restart();
- timing.sleepABit();
- Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+ };
+ client.getConnectionStateListenable().addListener(listener);
+ timing.sleepABit();
+ server.restart();
+ Assert.assertTrue(timing.awaitLatch(connectedLatch));
+ timing.sleepABit();
+ Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ server.stop();
+ timing.sleepABit();
+ server.restart();
+ timing.sleepABit();
+ Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+ }
}
finally
{
@@@ -150,7 -135,7 +151,8 @@@
try
{
client.start();
- PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+ node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
final CountDownLatch connectedLatch = new CountDownLatch(1);
@@@ -231,6 -215,6 +233,7 @@@
{
client.start();
node = new PersistentEphemeralNode(client, mode, PATH, "a".getBytes());
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS));
@@@ -270,6 -257,6 +273,7 @@@
CuratorFramework curator = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
String path = null;
try
@@@ -292,6 -279,6 +296,7 @@@
CuratorFramework curator = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
@@@ -310,9 -297,9 +315,10 @@@
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertNodeExists(observer, node.getActualPath());
@@@ -340,9 -325,9 +346,10 @@@
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(5, TimeUnit.SECONDS);
assertNodeExists(observer, node.getActualPath());
@@@ -374,9 -357,9 +381,10 @@@
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String path = node.getActualPath();
assertNodeExists(observer, path);
@@@ -421,6 -401,6 +429,7 @@@
observer.getData().usingWatcher(dataChangedTrigger).forPath(PATH);
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
try
{
@@@ -454,9 -434,37 +463,10 @@@
CuratorFramework curator = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
- try
- {
- node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
- String originalNode = node.getActualPath();
- assertNodeExists(curator, originalNode);
-
- // Delete the original node...
- curator.delete().forPath(originalNode);
-
- // Since we're using an ephemeral node, and the original session hasn't been interrupted the name of the new
- // node that gets created is going to be exactly the same as the original.
- Trigger createdWatchTrigger = Trigger.created();
- Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
- assertTrue(stat != null || createdWatchTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
- }
- finally
- {
- node.close();
- }
- }
-
- @Test
- public void testRecreatesNodeWhenItGetsDeletedAfterSetData() throws Exception
- {
- CuratorFramework curator = newCurator();
-
- PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String originalNode = node.getActualPath();
assertNodeExists(curator, originalNode);
@@@ -481,13 -498,14 +491,15 @@@
{
CuratorFramework curator = newCurator();
- PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
- node1.start();
- try
+ try ( PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]) )
{
++ node1.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node1.start();
node1.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String path1 = node1.getActualPath();
PersistentEphemeralNode node2 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
++ node2.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node2.start();
try
{
@@@ -510,9 -532,9 +522,10 @@@
byte[] data = "Hello World".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
}
@@@ -536,9 -558,9 +549,10 @@@
byte[] data = "Hello World".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
}
@@@ -557,9 -579,9 +571,10 @@@
byte[] updatedData = "Updated".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
- node.start();
try
{
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
@@@ -603,9 -625,9 +618,10 @@@
byte[] updatedData = "Updated".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
- node.start();
try
{
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
@@@ -646,9 -668,9 +662,10 @@@
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.PROTECTED_EPHEMERAL, PATH,
new byte[0]);
- node.start();
try
{
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertNodeExists(curator, node.getActualPath());
@@@ -693,6 -714,6 +710,7 @@@
node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, PATH,
new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
node.waitForInitialCreate(timing.seconds(), TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
index 0000000,c006dd7..20d6916
mode 000000,100644..100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
@@@ -1,0 -1,62 +1,63 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.curator.framework.recipes.nodes;
+
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.BaseClassForTests;
+ import org.apache.curator.test.Timing;
+ import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ import java.util.concurrent.TimeUnit;
+
+ public class TestPersistentNode extends BaseClassForTests
+ {
+ @Test
+ public void testBasic() throws Exception
+ {
+ final byte[] TEST_DATA = "hey".getBytes();
+
+ Timing timing = new Timing();
+ PersistentNode pen = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
++ pen.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ pen.start();
+ Assert.assertTrue(pen.waitForInitialCreate(timing.milliseconds(), TimeUnit.MILLISECONDS));
+ client.close(); // cause session to end - force checks that node is persistent
+
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+
+ byte[] bytes = client.getData().forPath("/test");
+ Assert.assertEquals(bytes, TEST_DATA);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(pen);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ }