You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/19 03:05:51 UTC
[1/5] curator git commit: Deprecated PersistentEphemeralNode in favor
of PersistentNode which is the same code but now accepts any createmode
Repository: curator
Updated Branches:
refs/heads/CURATOR-3.0 d26c38dba -> c6a22ba50
Deprecated PersistentEphemeralNode in favor of PersistentNode which is the same code but now accepts any createmode
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/fefbba1c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/fefbba1c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/fefbba1c
Branch: refs/heads/CURATOR-3.0
Commit: fefbba1cc3bd5641983657440b40e25425165a6a
Parents: 45332f3
Author: randgalt <ra...@apache.org>
Authored: Tue Jan 12 11:45:38 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jan 12 11:45:38 2016 -0500
----------------------------------------------------------------------
.../recipes/nodes/PersistentEphemeralNode.java | 310 +--------------
.../framework/recipes/nodes/PersistentNode.java | 382 +++++++++++++++++++
.../src/site/confluence/group-member.confluence | 2 +-
.../persistent-ephemeral-node.confluence | 20 +-
.../nodes/TestPersistentEphemeralNode.java | 6 +-
.../TestPersistentEphemeralNodeListener.java | 1 +
.../recipes/nodes/TestPersistentNode.java | 62 +++
7 files changed, 468 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 684e0d9..5576dc2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -19,30 +19,8 @@
package org.apache.curator.framework.recipes.nodes;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CreateModable;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.PathUtils;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
@@ -52,85 +30,18 @@ import java.util.concurrent.atomic.AtomicReference;
* <p>
* Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
* </p>
+ *
+ * @deprecated This has been replaced with the more general {@link PersistentNode}
*/
-public class PersistentEphemeralNode implements Closeable
+@Deprecated
+public class PersistentEphemeralNode extends PersistentNode
{
- private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
- private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
- private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
- private final String basePath;
- private final Mode mode;
- private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final AtomicBoolean authFailure = new AtomicBoolean(false);
- private final BackgroundCallback backgroundCallback;
- private final CuratorWatcher watcher = new CuratorWatcher()
- {
- @Override
- public void process(WatchedEvent event) throws Exception
- {
- if ( event.getType() == EventType.NodeDeleted )
- {
- createNode();
- }
- else if ( event.getType() == EventType.NodeDataChanged )
- {
- watchNode();
- }
- }
- };
- private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
- {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
- {
- createNode();
- }
- }
- };
- private final BackgroundCallback setDataCallback = new BackgroundCallback()
- {
-
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event)
- throws Exception
- {
- //If the result is ok then initialisation is complete (if we're still initialising)
- //Don't retry on other errors as the only recoverable cases will be connection loss
- //and the node not existing, both of which are already handled by other watches.
- if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
- {
- //Update is ok, mark initialisation as complete if required.
- initialisationComplete();
- }
- }
- };
- private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if ( newState == ConnectionState.RECONNECTED )
- {
- createNode();
- }
- }
- };
-
- private enum State
- {
- LATENT,
- STARTED,
- CLOSED
- }
-
/**
* The mode for node creation
+ *
+ * @deprecated This has been replaced with the more general {@link PersistentNode}
*/
+ @Deprecated
public enum Mode
{
/**
@@ -216,212 +127,9 @@ public class PersistentEphemeralNode implements Closeable
* @param basePath the base path for the node
* @param initData data for the node
*/
+ @SuppressWarnings("deprecation")
public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] initData)
{
- this.client = Preconditions.checkNotNull(client, "client cannot be null");
- this.basePath = PathUtils.validatePath(basePath);
- this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
- final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
-
- backgroundCallback = new BackgroundCallback()
- {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
- String path = null;
- boolean nodeExists = false;
- if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
- {
- path = event.getPath();
- nodeExists = true;
- }
- else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
- {
- path = event.getName();
- }
- else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
- {
- log.warn("Client does not have authorisation to write ephemeral node at path {}", event.getPath());
- authFailure.set(true);
- return;
- }
- if ( path != null )
- {
- authFailure.set(false);
- nodePath.set(path);
- watchNode();
-
- if ( nodeExists )
- {
- client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
- }
- else
- {
- initialisationComplete();
- }
- }
- else
- {
- createNode();
- }
- }
- };
-
- createMethod = mode.isProtected() ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
- this.data.set(Arrays.copyOf(data, data.length));
- }
-
- private void initialisationComplete()
- {
- CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
- if ( localLatch != null )
- {
- localLatch.countDown();
- }
- }
-
- /**
- * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
- * in the background will be started
- */
- public void start()
- {
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
-
- client.getConnectionStateListenable().addListener(connectionStateListener);
- createNode();
- }
-
- /**
- * Block until the either initial node creation initiated by {@link #start()} succeeds or
- * the timeout elapses.
- *
- * @param timeout the maximum time to wait
- * @param unit time unit
- * @return if the node was created before timeout
- * @throws InterruptedException if the thread is interrupted
- */
- public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
- {
- Preconditions.checkState(state.get() == State.STARTED, "Not started");
-
- CountDownLatch localLatch = initialCreateLatch.get();
- return (localLatch == null) || localLatch.await(timeout, unit);
- }
-
- @Override
- public void close() throws IOException
- {
- if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- return;
- }
-
- client.getConnectionStateListenable().removeListener(connectionStateListener);
-
- try
- {
- deleteNode();
- }
- catch ( Exception e )
- {
- throw new IOException(e);
- }
- }
-
- /**
- * Returns the currently set path or null if the node does not exist
- *
- * @return node path or null
- */
- public String getActualPath()
- {
- return nodePath.get();
- }
-
- /**
- * Set data that ephemeral node should set in ZK also writes the data to the node
- *
- * @param data new data value
- * @throws Exception errors
- */
- public void setData(byte[] data) throws Exception
- {
- data = Preconditions.checkNotNull(data, "data cannot be null");
- this.data.set(Arrays.copyOf(data, data.length));
- if ( isActive() )
- {
- client.setData().inBackground().forPath(getActualPath(), getData());
- }
- }
-
- /**
- * Return the current value of our data
- *
- * @return our data
- */
- public byte[] getData()
- {
- return this.data.get();
- }
-
- private void deleteNode() throws Exception
- {
- String localNodePath = nodePath.getAndSet(null);
- if ( localNodePath != null )
- {
- try
- {
- client.delete().guaranteed().forPath(localNodePath);
- }
- catch ( KeeperException.NoNodeException ignore )
- {
- // ignore
- }
- }
- }
-
- private void createNode()
- {
- if ( !isActive() )
- {
- return;
- }
-
- try
- {
- String existingPath = nodePath.get();
- String createPath = (existingPath != null && !mode.isProtected()) ? existingPath : basePath;
- createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
- }
- catch ( Exception e )
- {
- throw new RuntimeException("Creating node. BasePath: " + basePath, e); // should never happen unless there's a programming error - so throw RuntimeException
- }
- }
-
- private void watchNode() throws Exception
- {
- if ( !isActive() )
- {
- return;
- }
-
- String localNodePath = nodePath.get();
- if ( localNodePath != null )
- {
- client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
- }
- }
-
- private boolean isActive()
- {
- return (state.get() == State.STARTED);
- }
-
- @VisibleForTesting
- boolean isAuthFailure()
- {
- return authFailure.get();
+ super(client, mode.getCreateMode(false), mode.isProtected(), basePath, initData);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
new file mode 100644
index 0000000..c66eb30
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+public class PersistentNode implements Closeable
+{
+ private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final String basePath;
+ private final CreateMode mode;
+ private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean authFailure = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final boolean useProtection;
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == EventType.NodeDeleted )
+ {
+ createNode();
+ }
+ else if ( event.getType() == EventType.NodeDataChanged )
+ {
+ watchNode();
+ }
+ }
+ };
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
+ createNode();
+ }
+ else
+ {
+ boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
+ if ( isEphemeral != mode.isEphemeral() )
+ {
+ log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
+ }
+ }
+ }
+ };
+ private final BackgroundCallback setDataCallback = new BackgroundCallback()
+ {
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ //If the result is ok then initialisation is complete (if we're still initialising)
+ //Don't retry on other errors as the only recoverable cases will be connection loss
+ //and the node not existing, both of which are already handled by other watches.
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ //Update is ok, mark initialisation as complete if required.
+ initialisationComplete();
+ }
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ createNode();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * @param client client instance
+ * @param mode creation mode
+ * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+ * @param basePath the base path for the node
+ * @param initData data for the node
+ */
+ public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+ {
+ this.useProtection = useProtection;
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.basePath = PathUtils.validatePath(basePath);
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ String path = null;
+ boolean nodeExists = false;
+ if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ nodeExists = true;
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+ {
+ log.warn("Client does not have authorisation to write ephemeral node at path {}", event.getPath());
+ authFailure.set(true);
+ return;
+ }
+ if ( path != null )
+ {
+ authFailure.set(false);
+ nodePath.set(path);
+ watchNode();
+
+ if ( nodeExists )
+ {
+ client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+ }
+ else
+ {
+ initialisationComplete();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+ this.data.set(Arrays.copyOf(data, data.length));
+ }
+
+ private void initialisationComplete()
+ {
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+
+ /**
+ * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link #start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+ CountDownLatch localLatch = initialCreateLatch.get();
+ return (localLatch == null) || localLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+ try
+ {
+ deleteNode();
+ }
+ catch ( Exception e )
+ {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ /**
+ * Set data that ephemeral node should set in ZK also writes the data to the node
+ *
+ * @param data new data value
+ * @throws Exception errors
+ */
+ public void setData(byte[] data) throws Exception
+ {
+ data = Preconditions.checkNotNull(data, "data cannot be null");
+ this.data.set(Arrays.copyOf(data, data.length));
+ if ( isActive() )
+ {
+ client.setData().inBackground().forPath(getActualPath(), getData());
+ }
+ }
+
+ /**
+ * Return the current value of our data
+ *
+ * @return our data
+ */
+ public byte[] getData()
+ {
+ return this.data.get();
+ }
+
+ private void deleteNode() throws Exception
+ {
+ String localNodePath = nodePath.getAndSet(null);
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.delete().guaranteed().forPath(localNodePath);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ }
+ }
+
+ private void createNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ try
+ {
+ String existingPath = nodePath.get();
+ String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+ createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException("Creating node. BasePath: " + basePath, e); // should never happen unless there's a programming error - so throw RuntimeException
+ }
+ }
+
+ private CreateMode getCreateMode(boolean pathIsSet)
+ {
+ if ( pathIsSet )
+ {
+ switch ( mode )
+ {
+ default:
+ {
+ break;
+ }
+
+ case EPHEMERAL_SEQUENTIAL:
+ {
+ return CreateMode.EPHEMERAL; // protection case - node already set
+ }
+
+ case PERSISTENT_SEQUENTIAL:
+ {
+ return CreateMode.PERSISTENT; // protection case - node already set
+ }
+ }
+ }
+ return mode;
+ }
+
+ private void watchNode() throws Exception
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ String localNodePath = nodePath.get();
+ if ( localNodePath != null )
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+ }
+ }
+
+ private boolean isActive()
+ {
+ return (state.get() == State.STARTED);
+ }
+
+ @VisibleForTesting
+ boolean isAuthFailure()
+ {
+ return authFailure.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/site/confluence/group-member.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/group-member.confluence b/curator-recipes/src/site/confluence/group-member.confluence
index a370675..5c24c75 100644
--- a/curator-recipes/src/site/confluence/group-member.confluence
+++ b/curator-recipes/src/site/confluence/group-member.confluence
@@ -5,7 +5,7 @@ Group membership management. Adds this instance into a group and keeps a cache o
h2. Participating Classes
* GroupMember
-* PersistentEphemeralNode
+* PersistentNode
* PathChildrenCache
h2. Usage
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence b/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
index 0023f57..aeb9e10 100644
--- a/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
+++ b/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence
@@ -1,32 +1,34 @@
h1. Persistent Ephemeral Node
h2. Description
-A persistent ephemeral node is an ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.
+A persistent node is a node that attempts to stay present in ZooKeeper, even through connection and session interruptions.
h2. Participating Classes
-* PersistentEphemeralNode
+* PersistentNode
h2. Usage
-h3. Creating a PersistentEphemeralNode
+h3. Creating a PersistentNode
{code}
-public PersistentEphemeralNode(CuratorFramework client,
- PersistentEphemeralNode.Mode mode,
+public PersistentNode(CuratorFramework client,
+ CreateMode mode,
+ boolean useProtection,
String basePath,
byte[] data)
Parameters:
client - client instance
-mode - creation/protection mode
+mode - creation mode
+useProtection - if true, call CreateBuilder.withProtection()
basePath - the base path for the node
data - data for the node
{code}
h3. General Usage
-PersistentEphemeralNodes must be started:
+PersistentNodes must be started:
{code}
node.start();
{code}
-When you are through with the PersistentEphemeralNode instance, you should call close:
+When you are through with the PersistentNode instance, you should call close:
{code}
node.close();
{code}
@@ -34,4 +36,4 @@ node.close();
NOTE: this will delete the node
h2. Error Handling
-PersistentEphemeralNode instances internally handle all error states recreating the node as necessary.
+PersistentNode instances internally handle all error states recreating the node as necessary.
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 84eaa52..f451feb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.nodes;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
@@ -30,21 +29,19 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -55,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import static org.testng.Assert.*;
+@SuppressWarnings("deprecation")
public class TestPersistentEphemeralNode extends BaseClassForTests
{
private static final String DIR = "/test";
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
index ceff4c5..6771eec 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNodeListener.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+@SuppressWarnings("deprecation")
public class TestPersistentEphemeralNodeListener extends BaseClassForTests
{
@Test
http://git-wip-us.apache.org/repos/asf/curator/blob/fefbba1c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
new file mode 100644
index 0000000..c006dd7
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.nodes;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.TimeUnit;
+
+public class TestPersistentNode extends BaseClassForTests
+{
+ @Test
+ public void testBasic() throws Exception
+ {
+ final byte[] TEST_DATA = "hey".getBytes();
+
+ Timing timing = new Timing();
+ PersistentNode pen = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
+ pen.start();
+ Assert.assertTrue(pen.waitForInitialCreate(timing.milliseconds(), TimeUnit.MILLISECONDS));
+ client.close(); // cause session to end - force checks that node is persistent
+
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+
+ byte[] bytes = client.getData().forPath("/test");
+ Assert.assertEquals(bytes, TEST_DATA);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(pen);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+}
[2/5] curator git commit: fixed doc
Posted by ra...@apache.org.
fixed doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e657cf66
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e657cf66
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e657cf66
Branch: refs/heads/CURATOR-3.0
Commit: e657cf66bc54dec762f0355a56cf3b3504a960d9
Parents: fefbba1
Author: randgalt <ra...@apache.org>
Authored: Tue Jan 12 12:13:58 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jan 12 12:13:58 2016 -0500
----------------------------------------------------------------------
.../org/apache/curator/framework/recipes/nodes/PersistentNode.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e657cf66/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index c66eb30..fbeaeff 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
- * A persistent ephemeral node is an ephemeral node that attempts to stay present in
+ * A persistent node is a node that attempts to stay present in
* ZooKeeper, even through connection and session interruptions.
* </p>
* <p>
[5/5] curator git commit: Merge branch 'master' into CURATOR-3.0
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-3.0
Conflicts:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c6a22ba5
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c6a22ba5
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c6a22ba5
Branch: refs/heads/CURATOR-3.0
Commit: c6a22ba508f9227fe1c657f93e3cc77d8bc17e3e
Parents: d26c38d 649e0ba
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 21:05:41 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 21:05:41 2016 -0500
----------------------------------------------------------------------
.../framework/imps/NamespaceWatcherMap.java | 14 +-
.../framework/imps/WatcherRemovalFacade.java | 2 +-
.../framework/imps/WatcherRemovalManager.java | 3 +-
.../recipes/nodes/PersistentEphemeralNode.java | 332 +-------------
.../framework/recipes/nodes/PersistentNode.java | 436 +++++++++++++++++++
.../src/site/confluence/group-member.confluence | 2 +-
.../persistent-ephemeral-node.confluence | 20 +-
.../nodes/TestPersistentEphemeralNode.java | 22 +-
.../TestPersistentEphemeralNodeListener.java | 1 +
.../recipes/nodes/TestPersistentNode.java | 63 +++
10 files changed, 555 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index 00618e6,e5aecb2..c864f44
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@@ -19,6 -19,6 +19,7 @@@
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
++import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.Watcher;
@@@ -28,10 -28,10 +29,10 @@@ import java.util.concurrent.ConcurrentM
class NamespaceWatcherMap implements Closeable
{
-- private final ConcurrentMap<Object, NamespaceWatcher> map = CacheBuilder.newBuilder()
++ private final Cache<Object, NamespaceWatcher> cache = CacheBuilder.newBuilder()
.weakValues()
-- .<Object, NamespaceWatcher>build()
-- .asMap();
++ .<Object, NamespaceWatcher>build();
++ private final ConcurrentMap<Object, NamespaceWatcher> map = cache.asMap();
private final CuratorFrameworkImpl client;
NamespaceWatcherMap(CuratorFrameworkImpl client)
@@@ -85,6 -74,6 +86,7 @@@
@VisibleForTesting
boolean isEmpty()
{
++ cache.cleanUp();
return map.isEmpty();
}
@@@ -103,4 -92,4 +105,10 @@@
NamespaceWatcher existingNamespaceWatcher = map.putIfAbsent(watcher, newNamespaceWatcher);
return (existingNamespaceWatcher != null) ? existingNamespaceWatcher : newNamespaceWatcher;
}
++
++ @Override
++ public String toString()
++ {
++ return map.toString();
++ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 91530b4,0000000..30a992e
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@@ -1,198 -1,0 +1,198 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+
+class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
+{
+ private final CuratorFrameworkImpl client;
+ private final WatcherRemovalManager removalManager;
+
+ WatcherRemovalFacade(CuratorFrameworkImpl client)
+ {
+ super(client);
+ this.client = client;
+ removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap());
+ }
+
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
+ {
+ return client.newWatcherRemoveCuratorFramework();
+ }
+
+ WatcherRemovalManager getRemovalManager()
+ {
+ return removalManager;
+ }
+
+ @Override
+ public QuorumVerifier getCurrentConfig()
+ {
+ return client.getCurrentConfig();
+ }
+
+ @Override
+ public void removeWatchers()
+ {
+ removalManager.removeWatchers();
+
+ if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) )
+ {
+ if ( !getNamespaceWatcherMap().isEmpty() )
+ {
- throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap());
++ throw new RuntimeException("NamespaceWatcherMap is not empty: " + getNamespaceWatcherMap());
+ }
+ }
+ }
+
+ @Override
+ WatcherRemovalManager getWatcherRemovalManager()
+ {
+ return removalManager;
+ }
+
+ @Override
+ public CuratorFramework nonNamespaceView()
+ {
+ return client.nonNamespaceView();
+ }
+
+ @Override
+ public CuratorFramework usingNamespace(String newNamespace)
+ {
+ return client.usingNamespace(newNamespace);
+ }
+
+ @Override
+ public String getNamespace()
+ {
+ return client.getNamespace();
+ }
+
+ @Override
+ public void start()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Listenable<ConnectionStateListener> getConnectionStateListenable()
+ {
+ return client.getConnectionStateListenable();
+ }
+
+ @Override
+ public Listenable<CuratorListener> getCuratorListenable()
+ {
+ return client.getCuratorListenable();
+ }
+
+ @Override
+ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
+ {
+ return client.getUnhandledErrorListenable();
+ }
+
+ @Override
+ public void sync(String path, Object context)
+ {
+ client.sync(path, context);
+ }
+
+ @Override
+ public CuratorZookeeperClient getZookeeperClient()
+ {
+ return client.getZookeeperClient();
+ }
+
+ @Override
+ RetryLoop newRetryLoop()
+ {
+ return client.newRetryLoop();
+ }
+
+ @Override
+ ZooKeeper getZooKeeper() throws Exception
+ {
+ return client.getZooKeeper();
+ }
+
+ @Override
+ <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
+ {
+ client.processBackgroundOperation(operationAndData, event);
+ }
+
+ @Override
+ void logError(String reason, Throwable e)
+ {
+ client.logError(reason, e);
+ }
+
+ @Override
+ String unfixForNamespace(String path)
+ {
+ return client.unfixForNamespace(path);
+ }
+
+ @Override
+ String fixForNamespace(String path)
+ {
+ return client.fixForNamespace(path);
+ }
+
+ @Override
+ String fixForNamespace(String path, boolean isSequential)
+ {
+ return client.fixForNamespace(path, isSequential);
+ }
+
+ @Override
+ public EnsurePath newNamespaceAwareEnsurePath(String path)
+ {
+ return client.newNamespaceAwareEnsurePath(path);
+ }
+
+ @Override
+ FailedDeleteManager getFailedDeleteManager()
+ {
+ return client.getFailedDeleteManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
----------------------------------------------------------------------
diff --cc curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index 064964d,0000000..1e6fe94
mode 100644,000000..100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@@ -1,140 -1,0 +1,141 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WatcherRemovalManager
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFrameworkImpl client;
+ private final NamespaceWatcherMap namespaceWatcherMap;
+ private final Set<WrappedWatcher> entries = Sets.newHashSet(); // guarded by sync
+
+ WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap)
+ {
+ this.client = client;
+ this.namespaceWatcherMap = namespaceWatcherMap;
+ }
+
+ synchronized Watcher add(String path, Watcher watcher)
+ {
+ path = Preconditions.checkNotNull(path, "path cannot be null");
+ watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null");
+
+ WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path);
+ entries.add(wrappedWatcher);
+ return wrappedWatcher;
+ }
+
+ @VisibleForTesting
+ synchronized Set<? extends Watcher> getEntries()
+ {
+ return Sets.newHashSet(entries);
+ }
+
+ void removeWatchers()
+ {
+ HashSet<WrappedWatcher> localEntries;
+ synchronized(this)
+ {
+ localEntries = Sets.newHashSet(entries);
+ }
+ for ( WrappedWatcher entry : localEntries )
+ {
+ try
+ {
+ log.debug("Removing watcher for path: " + entry.path);
- namespaceWatcherMap.removeWatcher(entry.watcher);
+ RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
++ namespaceWatcherMap.removeWatcher(entry.watcher);
+ builder.internalRemoval(entry, entry.path);
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not remove watcher for path: " + entry.path);
+ }
+ }
+ }
+
+ private synchronized void internalRemove(WrappedWatcher entry)
+ {
++ namespaceWatcherMap.removeWatcher(entry.watcher);
+ entries.remove(entry);
+ }
+
+ private class WrappedWatcher implements Watcher
+ {
+ private final Watcher watcher;
+ private final String path;
+
+ WrappedWatcher(Watcher watcher, String path)
+ {
+ this.watcher = watcher;
+ this.path = path;
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() != Event.EventType.None )
+ {
+ internalRemove(this);
+ }
+ watcher.process(event);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ WrappedWatcher entry = (WrappedWatcher)o;
+
+ //noinspection SimplifiableIfStatement
+ if ( !watcher.equals(entry.watcher) )
+ {
+ return false;
+ }
+ return path.equals(entry.path);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = watcher.hashCode();
+ result = 31 * result + path.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index 0000000,0d7ab9d..93c88f7
mode 000000,100644..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,0 -1,385 +1,436 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.curator.framework.recipes.nodes;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ import org.apache.curator.framework.CuratorFramework;
++import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+ import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CreateBuilder;
+ import org.apache.curator.framework.api.CreateModable;
+ import org.apache.curator.framework.api.CuratorEvent;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.state.ConnectionState;
+ import org.apache.curator.framework.state.ConnectionStateListener;
+ import org.apache.curator.utils.PathUtils;
+ import org.apache.curator.utils.ThreadUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.WatchedEvent;
+ import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import java.io.Closeable;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+
+ /**
+ * <p>
+ * A persistent node is a node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+ public class PersistentNode implements Closeable
+ {
+ private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
++ private final WatcherRemoveCuratorFramework client;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final String basePath;
+ private final CreateMode mode;
+ private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean authFailure = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final boolean useProtection;
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
- if ( event.getType() == EventType.NodeDeleted )
++ if ( isActive() )
+ {
- createNode();
- }
- else if ( event.getType() == EventType.NodeDataChanged )
- {
- watchNode();
++ if ( event.getType() == EventType.NodeDeleted )
++ {
++ createNode();
++ }
++ else if ( event.getType() == EventType.NodeDataChanged )
++ {
++ watchNode();
++ }
+ }
+ }
+ };
++
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+ {
+ @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++ public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+ {
- if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++ if ( isActive() )
+ {
- createNode();
++ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
++ {
++ createNode();
++ }
++ else
++ {
++ boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
++ if ( isEphemeral != mode.isEphemeral() )
++ {
++ log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
++ }
++ }
+ }
+ else
+ {
- boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
- if ( isEphemeral != mode.isEphemeral() )
- {
- log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
- }
++ client.removeWatchers();
+ }
+ }
+ };
+ private final BackgroundCallback setDataCallback = new BackgroundCallback()
+ {
+
+ @Override
- public void processResult(CuratorFramework client, CuratorEvent event)
++ public void processResult(CuratorFramework dummy, CuratorEvent event)
+ throws Exception
+ {
+ //If the result is ok then initialisation is complete (if we're still initialising)
+ //Don't retry on other errors as the only recoverable cases will be connection loss
+ //and the node not existing, both of which are already handled by other watches.
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ //Update is ok, mark initialisation as complete if required.
+ initialisationComplete();
+ }
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
++ public void stateChanged(CuratorFramework dummy, ConnectionState newState)
+ {
- if ( newState == ConnectionState.RECONNECTED )
++ if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+ {
+ createNode();
+ }
+ }
+ };
+
++ @VisibleForTesting
++ volatile CountDownLatch debugCreateNodeLatch = null;
++
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
- * @param client client instance
++ * @param givenClient client instance
+ * @param mode creation mode
+ * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+ * @param basePath the base path for the node
+ * @param initData data for the node
+ */
- public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
++ public PersistentNode(CuratorFramework givenClient, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+ {
+ this.useProtection = useProtection;
- this.client = Preconditions.checkNotNull(client, "client cannot be null");
++ this.client = Preconditions.checkNotNull(givenClient, "client cannot be null").newWatcherRemoveCuratorFramework();
+ this.basePath = PathUtils.validatePath(basePath);
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
++ public void processResult(CuratorFramework dummy, CuratorEvent event) throws Exception
+ {
++ if ( !isActive() )
++ {
++ return;
++ }
++
+ String path = null;
+ boolean nodeExists = false;
+ if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ nodeExists = true;
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+ {
- log.warn("Client does not have authorisation to write node at path {}", event.getPath());
++ log.warn("Client does not have authorization to write node at path {}", event.getPath());
+ authFailure.set(true);
+ return;
+ }
+ if ( path != null )
+ {
+ authFailure.set(false);
+ nodePath.set(path);
+ watchNode();
+
+ if ( nodeExists )
+ {
+ client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+ }
+ else
+ {
+ initialisationComplete();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+ this.data.set(Arrays.copyOf(data, data.length));
+ }
+
+ private void initialisationComplete()
+ {
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+
+ /**
+ * You must call start() to initiate the persistent node. An attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link #start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+ CountDownLatch localLatch = initialCreateLatch.get();
+ return (localLatch == null) || localLatch.await(timeout, unit);
+ }
+
++ @VisibleForTesting
++ final AtomicLong debugWaitMsForBackgroundBeforeClose = new AtomicLong(0);
++
+ @Override
+ public void close() throws IOException
+ {
++ if ( debugWaitMsForBackgroundBeforeClose.get() > 0 )
++ {
++ try
++ {
++ Thread.sleep(debugWaitMsForBackgroundBeforeClose.get());
++ }
++ catch ( InterruptedException e )
++ {
++ Thread.currentThread().interrupt();
++ }
++ }
++
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+ try
+ {
+ deleteNode();
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ throw new IOException(e);
+ }
++
++ client.removeWatchers();
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ /**
+ * Set data that node should set in ZK also writes the data to the node
+ *
+ * @param data new data value
+ * @throws Exception errors
+ */
+ public void setData(byte[] data) throws Exception
+ {
+ data = Preconditions.checkNotNull(data, "data cannot be null");
+ this.data.set(Arrays.copyOf(data, data.length));
+ if ( isActive() )
+ {
+ client.setData().inBackground().forPath(getActualPath(), getData());
+ }
+ }
+
+ /**
+ * Return the current value of our data
+ *
+ * @return our data
+ */
+ public byte[] getData()
+ {
+ return this.data.get();
+ }
+
+ private void deleteNode() throws Exception
+ {
+ String localNodePath = nodePath.getAndSet(null);
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.delete().guaranteed().forPath(localNodePath);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ }
+ }
+
+ private void createNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
++ if ( debugCreateNodeLatch != null )
++ {
++ try
++ {
++ debugCreateNodeLatch.await();
++ }
++ catch ( InterruptedException e )
++ {
++ Thread.currentThread().interrupt();
++ return;
++ }
++ }
++
+ try
+ {
+ String existingPath = nodePath.get();
+ String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+ createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ throw new RuntimeException("Creating node. BasePath: " + basePath, e); // should never happen unless there's a programming error - so throw RuntimeException
+ }
+ }
+
+ private CreateMode getCreateMode(boolean pathIsSet)
+ {
+ if ( pathIsSet )
+ {
+ switch ( mode )
+ {
+ default:
+ {
+ break;
+ }
+
+ case EPHEMERAL_SEQUENTIAL:
+ {
+ return CreateMode.EPHEMERAL; // protection case - node already set
+ }
+
+ case PERSISTENT_SEQUENTIAL:
+ {
+ return CreateMode.PERSISTENT; // protection case - node already set
+ }
+ }
+ }
+ return mode;
+ }
+
+ private void watchNode() throws Exception
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ String localNodePath = nodePath.get();
+ if ( localNodePath != null )
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+ }
+ }
+
+ private boolean isActive()
+ {
+ return (state.get() == State.STARTED);
+ }
+
+ @VisibleForTesting
+ boolean isAuthFailure()
+ {
+ return authFailure.get();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 2fb6c66,f451feb..15c5f2e
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@@ -37,12 -35,10 +37,12 @@@ import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.Watcher.Event.EventType;
+ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@@ -58,9 -52,9 +57,10 @@@ import java.util.concurrent.TimeUnit
import static org.testng.Assert.*;
+ @SuppressWarnings("deprecation")
public class TestPersistentEphemeralNode extends BaseClassForTests
{
+ private static final Logger log = LoggerFactory.getLogger(TestPersistentEphemeralNode.class);
private static final String DIR = "/test";
private static final String PATH = ZKPaths.makePath(DIR, "/foo");
@@@ -100,39 -88,37 +100,40 @@@
try
{
client.start();
- PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
- node.start();
-
- final CountDownLatch connectedLatch = new CountDownLatch(1);
- final CountDownLatch reconnectedLatch = new CountDownLatch(1);
- ConnectionStateListener listener = new ConnectionStateListener()
+ try ( PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes()) )
{
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
+
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ ConnectionStateListener listener = new ConnectionStateListener()
{
- if ( newState == ConnectionState.CONNECTED )
- {
- connectedLatch.countDown();
- }
- if ( newState == ConnectionState.RECONNECTED )
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- reconnectedLatch.countDown();
+ if ( newState == ConnectionState.CONNECTED )
+ {
+ connectedLatch.countDown();
+ }
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectedLatch.countDown();
+ }
}
- }
- };
- client.getConnectionStateListenable().addListener(listener);
- timing.sleepABit();
- server.restart();
- Assert.assertTrue(timing.awaitLatch(connectedLatch));
- timing.sleepABit();
- Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
- server.stop();
- timing.sleepABit();
- server.restart();
- timing.sleepABit();
- Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+ };
+ client.getConnectionStateListenable().addListener(listener);
+ timing.sleepABit();
+ server.restart();
+ Assert.assertTrue(timing.awaitLatch(connectedLatch));
+ timing.sleepABit();
+ Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ server.stop();
+ timing.sleepABit();
+ server.restart();
+ timing.sleepABit();
+ Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+ }
}
finally
{
@@@ -150,7 -135,7 +151,8 @@@
try
{
client.start();
- PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+ node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
final CountDownLatch connectedLatch = new CountDownLatch(1);
@@@ -231,6 -215,6 +233,7 @@@
{
client.start();
node = new PersistentEphemeralNode(client, mode, PATH, "a".getBytes());
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS));
@@@ -270,6 -257,6 +273,7 @@@
CuratorFramework curator = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
String path = null;
try
@@@ -292,6 -279,6 +296,7 @@@
CuratorFramework curator = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
@@@ -310,9 -297,9 +315,10 @@@
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertNodeExists(observer, node.getActualPath());
@@@ -340,9 -325,9 +346,10 @@@
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(5, TimeUnit.SECONDS);
assertNodeExists(observer, node.getActualPath());
@@@ -374,9 -357,9 +381,10 @@@
CuratorFramework observer = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String path = node.getActualPath();
assertNodeExists(observer, path);
@@@ -421,6 -401,6 +429,7 @@@
observer.getData().usingWatcher(dataChangedTrigger).forPath(PATH);
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
try
{
@@@ -454,9 -434,37 +463,10 @@@
CuratorFramework curator = newCurator();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
- try
- {
- node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
- String originalNode = node.getActualPath();
- assertNodeExists(curator, originalNode);
-
- // Delete the original node...
- curator.delete().forPath(originalNode);
-
- // Since we're using an ephemeral node, and the original session hasn't been interrupted the name of the new
- // node that gets created is going to be exactly the same as the original.
- Trigger createdWatchTrigger = Trigger.created();
- Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
- assertTrue(stat != null || createdWatchTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
- }
- finally
- {
- node.close();
- }
- }
-
- @Test
- public void testRecreatesNodeWhenItGetsDeletedAfterSetData() throws Exception
- {
- CuratorFramework curator = newCurator();
-
- PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String originalNode = node.getActualPath();
assertNodeExists(curator, originalNode);
@@@ -481,13 -498,14 +491,15 @@@
{
CuratorFramework curator = newCurator();
- PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
- node1.start();
- try
+ try ( PersistentEphemeralNode node1 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]) )
{
++ node1.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node1.start();
node1.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
String path1 = node1.getActualPath();
PersistentEphemeralNode node2 = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, PATH, new byte[0]);
++ node2.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node2.start();
try
{
@@@ -510,9 -532,9 +522,10 @@@
byte[] data = "Hello World".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
}
@@@ -536,9 -558,9 +549,10 @@@
byte[] data = "Hello World".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
- node.start();
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
try
{
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data));
}
@@@ -557,9 -579,9 +571,10 @@@
byte[] updatedData = "Updated".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
- node.start();
try
{
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
@@@ -603,9 -625,9 +618,10 @@@
byte[] updatedData = "Updated".getBytes();
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
- node.start();
try
{
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
@@@ -646,9 -668,9 +662,10 @@@
PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.PROTECTED_EPHEMERAL, PATH,
new byte[0]);
- node.start();
try
{
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ node.start();
node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
assertNodeExists(curator, node.getActualPath());
@@@ -693,6 -714,6 +710,7 @@@
node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, PATH,
new byte[0]);
++ node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
node.start();
node.waitForInitialCreate(timing.seconds(), TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/curator/blob/c6a22ba5/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
index 0000000,c006dd7..20d6916
mode 000000,100644..100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentNode.java
@@@ -1,0 -1,62 +1,63 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.curator.framework.recipes.nodes;
+
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.BaseClassForTests;
+ import org.apache.curator.test.Timing;
+ import org.apache.curator.utils.CloseableUtils;
+ import org.apache.zookeeper.CreateMode;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ import java.util.concurrent.TimeUnit;
+
+ public class TestPersistentNode extends BaseClassForTests
+ {
+ @Test
+ public void testBasic() throws Exception
+ {
+ final byte[] TEST_DATA = "hey".getBytes();
+
+ Timing timing = new Timing();
+ PersistentNode pen = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ pen = new PersistentNode(client, CreateMode.PERSISTENT, false, "/test", TEST_DATA);
++ pen.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
+ pen.start();
+ Assert.assertTrue(pen.waitForInitialCreate(timing.milliseconds(), TimeUnit.MILLISECONDS));
+ client.close(); // cause session to end - force checks that node is persistent
+
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+
+ byte[] bytes = client.getData().forPath("/test");
+ Assert.assertEquals(bytes, TEST_DATA);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(pen);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ }
[3/5] curator git commit: fixed docs/comments that refer to ephemeral
Posted by ra...@apache.org.
fixed docs/comments that refer to ephemeral
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/55ca08cb
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/55ca08cb
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/55ca08cb
Branch: refs/heads/CURATOR-3.0
Commit: 55ca08cb1aef3deada6ae6bb128c6fb2f0608777
Parents: e657cf6
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 18:11:45 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 18:11:45 2016 -0500
----------------------------------------------------------------------
.../apache/curator/framework/recipes/nodes/PersistentNode.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/55ca08cb/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index fbeaeff..cf0bf38 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@ -171,7 +171,7 @@ public class PersistentNode implements Closeable
}
else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
{
- log.warn("Client does not have authorisation to write ephemeral node at path {}", event.getPath());
+ log.warn("Client does not have authorisation to write node at path {}", event.getPath());
authFailure.set(true);
return;
}
@@ -211,7 +211,7 @@ public class PersistentNode implements Closeable
}
/**
- * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
+ * You must call start() to initiate the persistent node. An attempt to create the node
* in the background will be started
*/
public void start()
@@ -270,7 +270,7 @@ public class PersistentNode implements Closeable
}
/**
- * Set data that ephemeral node should set in ZK also writes the data to the node
+ * Set data that node should set in ZK also writes the data to the node
*
* @param data new data value
* @throws Exception errors
[4/5] curator git commit: Merge branch 'master' into CURATOR-287
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-287
Conflicts:
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/649e0ba2
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/649e0ba2
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/649e0ba2
Branch: refs/heads/CURATOR-3.0
Commit: 649e0ba24b4123829e1267c755318a288f6f9e0c
Parents: 55ca08c b08b543
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 19:10:14 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 19:10:14 2016 -0500
----------------------------------------------------------------------
.../org/apache/curator/ConnectionState.java | 4 ++
.../apache/curator/CuratorZookeeperClient.java | 2 +
.../main/java/org/apache/curator/RetryLoop.java | 2 +
.../apache/curator/SessionFailRetryLoop.java | 2 +
.../exhibitor/ExhibitorEnsembleProvider.java | 2 +
.../org/apache/curator/utils/ThreadUtils.java | 23 +++++++
.../java/org/apache/curator/utils/ZKPaths.java | 2 +-
.../src/main/java/locking/LockingExample.java | 7 ++-
.../framework/api/VersionPathAndBytesable.java | 25 ++++++++
.../transaction/TransactionCreateBuilder.java | 5 +-
.../transaction/TransactionSetDataBuilder.java | 4 +-
.../curator/framework/imps/Backgrounding.java | 2 +
.../framework/imps/CreateBuilderImpl.java | 15 +++--
.../framework/imps/CuratorFrameworkImpl.java | 46 +++++++++-----
.../framework/imps/DeleteBuilderImpl.java | 2 +
.../framework/imps/FailedDeleteManager.java | 5 +-
.../FindAndDeleteProtectedNodeInBackground.java | 3 +
.../framework/imps/GetDataBuilderImpl.java | 2 +
.../curator/framework/imps/NamespaceImpl.java | 2 +
.../framework/imps/NamespaceWatcher.java | 2 +
.../framework/imps/OperationAndData.java | 11 +++-
.../framework/imps/SetDataBuilderImpl.java | 8 +--
.../framework/listen/ListenerContainer.java | 2 +
.../framework/state/ConnectionStateManager.java | 14 +++--
.../framework/imps/TestTransactions.java | 46 +++++++++++++-
.../recipes/AfterConnectionEstablished.java | 1 +
.../recipes/cache/DefaultTreeCacheSelector.java | 37 +++++++++++
.../framework/recipes/cache/NodeCache.java | 4 ++
.../recipes/cache/PathChildrenCache.java | 4 ++
.../framework/recipes/cache/TreeCache.java | 45 +++++++++----
.../recipes/cache/TreeCacheSelector.java | 66 ++++++++++++++++++++
.../framework/recipes/leader/LeaderLatch.java | 5 ++
.../recipes/leader/LeaderSelector.java | 10 ++-
.../framework/recipes/locks/ChildReaper.java | 1 +
.../recipes/locks/InterProcessMultiLock.java | 4 ++
.../recipes/locks/InterProcessSemaphore.java | 4 ++
.../recipes/locks/InterProcessSemaphoreV2.java | 2 +
.../framework/recipes/locks/LockInternals.java | 2 +
.../curator/framework/recipes/locks/Reaper.java | 1 +
.../framework/recipes/nodes/GroupMember.java | 3 +
.../framework/recipes/nodes/PersistentNode.java | 3 +
.../recipes/queue/DistributedQueue.java | 43 ++++++++-----
.../framework/recipes/queue/QueueSharder.java | 16 +++--
.../framework/recipes/shared/SharedValue.java | 2 +
...estResetConnectionWithBackgroundFailure.java | 36 +++++------
.../framework/recipes/cache/TestTreeCache.java | 57 +++++++++++++++--
.../framework/recipes/locks/TestLockACLs.java | 50 ++++++++++++---
.../curator/test/TestingZooKeeperMain.java | 31 ++++++++-
.../entity/JsonServiceInstanceMarshaller.java | 3 +
.../entity/JsonServiceInstancesMarshaller.java | 2 +
.../server/rest/DiscoveryResource.java | 6 ++
.../discovery/server/rest/InstanceCleanup.java | 2 +
.../discovery/details/ServiceDiscoveryImpl.java | 3 +
.../x/rpc/idl/discovery/DiscoveryService.java | 8 +++
.../idl/discovery/DiscoveryServiceLowLevel.java | 7 +++
.../idl/services/CuratorProjectionService.java | 25 ++++++++
56 files changed, 606 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/649e0ba2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
index cf0bf38,0000000..0d7ab9d
mode 100644,000000..100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
@@@ -1,382 -1,0 +1,385 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
++import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * <p>
+ * A persistent node is a node that attempts to stay present in
+ * ZooKeeper, even through connection and session interruptions.
+ * </p>
+ * <p>
+ * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
+ * </p>
+ */
+public class PersistentNode implements Closeable
+{
+ private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
+ private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+ private final String basePath;
+ private final CreateMode mode;
+ private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final AtomicBoolean authFailure = new AtomicBoolean(false);
+ private final BackgroundCallback backgroundCallback;
+ private final boolean useProtection;
+ private final CuratorWatcher watcher = new CuratorWatcher()
+ {
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if ( event.getType() == EventType.NodeDeleted )
+ {
+ createNode();
+ }
+ else if ( event.getType() == EventType.NodeDataChanged )
+ {
+ watchNode();
+ }
+ }
+ };
+ private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
+ {
+ createNode();
+ }
+ else
+ {
+ boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
+ if ( isEphemeral != mode.isEphemeral() )
+ {
+ log.warn("Existing node ephemeral state doesn't match requested state. Maybe the node was created outside of PersistentNode? " + basePath);
+ }
+ }
+ }
+ };
+ private final BackgroundCallback setDataCallback = new BackgroundCallback()
+ {
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ //If the result is ok then initialisation is complete (if we're still initialising)
+ //Don't retry on other errors as the only recoverable cases will be connection loss
+ //and the node not existing, both of which are already handled by other watches.
+ if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ //Update is ok, mark initialisation as complete if required.
+ initialisationComplete();
+ }
+ }
+ };
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ createNode();
+ }
+ }
+ };
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * @param client client instance
+ * @param mode creation mode
+ * @param useProtection if true, call {@link CreateBuilder#withProtection()}
+ * @param basePath the base path for the node
+ * @param initData data for the node
+ */
+ public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection, final String basePath, byte[] initData)
+ {
+ this.useProtection = useProtection;
+ this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.basePath = PathUtils.validatePath(basePath);
+ this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+ final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
+
+ backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ String path = null;
+ boolean nodeExists = false;
+ if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
+ {
+ path = event.getPath();
+ nodeExists = true;
+ }
+ else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+ {
+ path = event.getName();
+ }
+ else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue() )
+ {
+ log.warn("Client does not have authorisation to write node at path {}", event.getPath());
+ authFailure.set(true);
+ return;
+ }
+ if ( path != null )
+ {
+ authFailure.set(false);
+ nodePath.set(path);
+ watchNode();
+
+ if ( nodeExists )
+ {
+ client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
+ }
+ else
+ {
+ initialisationComplete();
+ }
+ }
+ else
+ {
+ createNode();
+ }
+ }
+ };
+
+ createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
+ this.data.set(Arrays.copyOf(data, data.length));
+ }
+
+ private void initialisationComplete()
+ {
+ CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ localLatch.countDown();
+ }
+ }
+
+ /**
+ * You must call start() to initiate the persistent node. An attempt to create the node
+ * in the background will be started
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ createNode();
+ }
+
+ /**
+ * Block until the either initial node creation initiated by {@link #start()} succeeds or
+ * the timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit time unit
+ * @return if the node was created before timeout
+ * @throws InterruptedException if the thread is interrupted
+ */
+ public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+
+ CountDownLatch localLatch = initialCreateLatch.get();
+ return (localLatch == null) || localLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ return;
+ }
+
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+ try
+ {
+ deleteNode();
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Returns the currently set path or null if the node does not exist
+ *
+ * @return node path or null
+ */
+ public String getActualPath()
+ {
+ return nodePath.get();
+ }
+
+ /**
+ * Set data that node should set in ZK also writes the data to the node
+ *
+ * @param data new data value
+ * @throws Exception errors
+ */
+ public void setData(byte[] data) throws Exception
+ {
+ data = Preconditions.checkNotNull(data, "data cannot be null");
+ this.data.set(Arrays.copyOf(data, data.length));
+ if ( isActive() )
+ {
+ client.setData().inBackground().forPath(getActualPath(), getData());
+ }
+ }
+
+ /**
+ * Return the current value of our data
+ *
+ * @return our data
+ */
+ public byte[] getData()
+ {
+ return this.data.get();
+ }
+
+ private void deleteNode() throws Exception
+ {
+ String localNodePath = nodePath.getAndSet(null);
+ if ( localNodePath != null )
+ {
+ try
+ {
+ client.delete().guaranteed().forPath(localNodePath);
+ }
+ catch ( KeeperException.NoNodeException ignore )
+ {
+ // ignore
+ }
+ }
+ }
+
+ private void createNode()
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ try
+ {
+ String existingPath = nodePath.get();
+ String createPath = (existingPath != null && !useProtection) ? existingPath : basePath;
+ createMethod.withMode(getCreateMode(existingPath != null)).inBackground(backgroundCallback).forPath(createPath, data.get());
+ }
+ catch ( Exception e )
+ {
++ ThreadUtils.checkInterrupted(e);
+ throw new RuntimeException("Creating node. BasePath: " + basePath, e); // should never happen unless there's a programming error - so throw RuntimeException
+ }
+ }
+
+ private CreateMode getCreateMode(boolean pathIsSet)
+ {
+ if ( pathIsSet )
+ {
+ switch ( mode )
+ {
+ default:
+ {
+ break;
+ }
+
+ case EPHEMERAL_SEQUENTIAL:
+ {
+ return CreateMode.EPHEMERAL; // protection case - node already set
+ }
+
+ case PERSISTENT_SEQUENTIAL:
+ {
+ return CreateMode.PERSISTENT; // protection case - node already set
+ }
+ }
+ }
+ return mode;
+ }
+
+ private void watchNode() throws Exception
+ {
+ if ( !isActive() )
+ {
+ return;
+ }
+
+ String localNodePath = nodePath.get();
+ if ( localNodePath != null )
+ {
+ client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
+ }
+ }
+
+ private boolean isActive()
+ {
+ return (state.get() == State.STARTED);
+ }
+
+ @VisibleForTesting
+ boolean isAuthFailure()
+ {
+ return authFailure.get();
+ }
+}