You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/12/26 17:00:59 UTC
[2/3] Initial pass at integrating
DescendantHandlingMode.ALL_DESCENDANTS patch
http://git-wip-us.apache.org/repos/asf/curator/blob/03bc3bee/CURATOR-33.patch
----------------------------------------------------------------------
diff --git a/CURATOR-33.patch b/CURATOR-33.patch
new file mode 100644
index 0000000..dbcb9d4
--- /dev/null
+++ b/CURATOR-33.patch
@@ -0,0 +1,2664 @@
+diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+index d66f7f3..03b169e 100644
+--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
++++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+@@ -1,32 +1,28 @@
+ /**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing,
+- * software distributed under the License is distributed on an
+- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+- * KIND, either express or implied. See the License for the
+- * specific language governing permissions and limitations
+- * under the License.
++ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
++ * agreements. See the NOTICE file distributed with this work for additional information regarding
++ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance with the License. You may obtain a
++ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
++ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
++ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
++ * for the specific language governing permissions and limitations under the License.
+ */
+
+ package org.apache.curator.framework.recipes.cache;
+
+-import com.google.common.annotations.VisibleForTesting;
+-import com.google.common.base.Function;
+-import com.google.common.base.Preconditions;
+-import com.google.common.base.Predicate;
+-import com.google.common.collect.ImmutableList;
+-import com.google.common.collect.Lists;
+-import com.google.common.collect.Maps;
+-import com.google.common.collect.Sets;
++import java.io.Closeable;
++import java.io.IOException;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.ConcurrentMap;
++import java.util.concurrent.Exchanger;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.atomic.AtomicReference;
++
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.api.BackgroundCallback;
+ import org.apache.curator.framework.api.CuratorEvent;
+@@ -43,79 +39,71 @@ import org.apache.zookeeper.Watcher;
+ import org.apache.zookeeper.data.Stat;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+-import java.io.Closeable;
+-import java.io.IOException;
+-import java.util.List;
+-import java.util.Map;
+-import java.util.Set;
+-import java.util.concurrent.ConcurrentMap;
+-import java.util.concurrent.Exchanger;
+-import java.util.concurrent.ExecutorService;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.ThreadFactory;
+-import java.util.concurrent.atomic.AtomicReference;
++
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Function;
++import com.google.common.base.Preconditions;
++import com.google.common.base.Predicate;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++import com.google.common.collect.Maps;
++import com.google.common.collect.Sets;
+
+ /**
+- * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class
+- * will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can
+- * register a listener that will get notified when changes occur.</p>
++ * <p>
++ * A utility that attempts to keep all data from all children of a ZK path locally cached. This
++ * class will watch the ZK path, respond to update/create/delete events, pull down the data, etc.
++ * You can register a listener that will get notified when changes occur.
++ * </p>
+ * <p/>
+- * <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+- * be prepared for false-positives and false-negatives. Additionally, always use the version number
+- * when updating data to avoid overwriting another process' change.</p>
++ * <p>
++ * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must be
++ * prepared for false-positives and false-negatives. Additionally, always use the version number
++ * when updating data to avoid overwriting another process' change.
++ * </p>
+ */
+ @SuppressWarnings("NullableProblems")
+-public class PathChildrenCache implements Closeable
+-{
++public class PathChildrenCache implements Closeable {
++
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final String path;
+ private final CloseableExecutorService executorService;
+ private final boolean cacheData;
++ private final DescendantHandlingMode descendantHandlingMode;
+ private final boolean dataIsCompressed;
+ private final EnsurePath ensurePath;
+ private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
+ private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
+ private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
+- private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
++ private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean> newConcurrentMap());
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+- private enum State
+- {
+- LATENT,
+- STARTED,
+- CLOSED
++ private enum State {
++ LATENT, STARTED, CLOSED
+ }
+
+ private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
+
+- private final Watcher childrenWatcher = new Watcher()
+- {
++ private final Watcher childrenWatcher = new Watcher() {
++
+ @Override
+- public void process(WatchedEvent event)
+- {
++ public void process(WatchedEvent event) {
+ offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
+ }
+ };
+
+- private final Watcher dataWatcher = new Watcher()
+- {
++ private final Watcher dataWatcher = new Watcher() {
++
+ @Override
+- public void process(WatchedEvent event)
+- {
+- try
+- {
+- if ( event.getType() == Event.EventType.NodeDeleted )
+- {
++ public void process(WatchedEvent event) {
++ try {
++ if (event.getType() == Event.EventType.NodeDeleted) {
+ remove(event.getPath());
+- }
+- else if ( event.getType() == Event.EventType.NodeDataChanged )
+- {
++ } else if (event.getType() == Event.EventType.NodeDataChanged) {
+ offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
+ }
+- }
+- catch ( Exception e )
+- {
++ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+@@ -124,98 +112,208 @@ public class PathChildrenCache implements Closeable
+ @VisibleForTesting
+ volatile Exchanger<Object> rebuildTestExchanger;
+
+- private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+- {
++ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
++
+ @Override
+- public void stateChanged(CuratorFramework client, ConnectionState newState)
+- {
++ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ handleStateChange(newState);
+ }
+ };
+ private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param mode caching mode
++ * Method of processing children of the root node. Whether all children under the root node
++ * should be considered or whether only the first level should be considered.
++ */
++ public enum DescendantHandlingMode {
++ /**
++ * Only children of the root node will be considered by the cache. This is default behaviour
++ */
++ DIRECT_DESCENDANTS_ONLY,
++
++ /**
++ * The root nodes children, and its children etc. will all be considered by the cache.
++ */
++ ALL_DESCENDANTS
++ }
++
++ /**
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param mode
++ * caching mode
+ * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
+ */
++ @Deprecated
+ @SuppressWarnings("deprecation")
+- public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
+- {
+- this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
++ public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) {
++ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false,
++ new CloseableExecutorService(
++ Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+ }
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param mode caching mode
+- * @param threadFactory factory to use when creating internal threads
+- * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param mode
++ * caching mode
++ * @param threadFactory
++ * factory to use when creating internal threads
++ * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)}
++ * instead
+ */
++ @Deprecated
+ @SuppressWarnings("deprecation")
+- public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
+- {
+- this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ PathChildrenCacheMode mode,
++ ThreadFactory threadFactory) {
++ this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false,
++ new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
+ }
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param cacheData if true, node contents are cached in addition to the stat
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
+ */
+- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
+- {
+- this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
++ public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {
++ this(client, path, cacheData, false, new CloseableExecutorService(
++ Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+ }
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param cacheData if true, node contents are cached in addition to the stat
+- * @param threadFactory factory to use when creating internal threads
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
++ * @param descendantHandlingMode
++ * Mode defining if only descendants of the root node will be considered or whether
++ * the entire tree will be.
+ */
+- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
+- {
+- this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ boolean cacheData,
++ DescendantHandlingMode descendantHandlingMode) {
++ this(client, path, cacheData, descendantHandlingMode, false, new CloseableExecutorService(
++ Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+ }
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param cacheData if true, node contents are cached in addition to the stat
+- * @param dataIsCompressed if true, data in the path is compressed
+- * @param threadFactory factory to use when creating internal threads
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
++ * @param threadFactory
++ * factory to use when creating internal threads
+ */
+- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
+- {
+- this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ boolean cacheData,
++ ThreadFactory threadFactory) {
++ this(client, path, cacheData, false, new CloseableExecutorService(
++ Executors.newSingleThreadExecutor(threadFactory), true));
+ }
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param cacheData if true, node contents are cached in addition to the stat
+- * @param dataIsCompressed if true, data in the path is compressed
+- * @param executorService ExecutorService to use for the PathChildrenCache's background thread
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
++ * @param dataIsCompressed
++ * if true, data in the path is compressed
++ * @param threadFactory
++ * factory to use when creating internal threads
+ */
+- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
+- {
+- this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ boolean cacheData,
++ boolean dataIsCompressed,
++ ThreadFactory threadFactory) {
++ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(
++ Executors.newSingleThreadExecutor(threadFactory), true));
+ }
+
+ /**
+- * @param client the client
+- * @param path path to watch
+- * @param cacheData if true, node contents are cached in addition to the stat
+- * @param dataIsCompressed if true, data in the path is compressed
+- * @param executorService Closeable ExecutorService to use for the PathChildrenCache's background thread
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
++ * @param dataIsCompressed
++ * if true, data in the path is compressed
++ * @param executorService
++ * ExecutorService to use for the PathChildrenCache's background thread
+ */
+- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
+- {
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ boolean cacheData,
++ boolean dataIsCompressed,
++ final ExecutorService executorService) {
++ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(
++ executorService));
++ }
++
++ /**
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
++ * @param dataIsCompressed
++ * if true, data in the path is compressed
++ * @param executorService
++ * Closeable ExecutorService to use for the PathChildrenCache's background thread
++ */
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ boolean cacheData,
++ boolean dataIsCompressed,
++ final CloseableExecutorService executorService) {
++ this(client, path, cacheData, DescendantHandlingMode.DIRECT_DESCENDANTS_ONLY,
++ dataIsCompressed, executorService);
++ }
++
++ /**
++ * @param client
++ * the client
++ * @param path
++ * path to watch
++ * @param cacheData
++ * if true, node contents are cached in addition to the stat
++ * @param descendantHandlingMode
++ * Mode defining if only descendants of the root node will be considered or whether
++ * the entire tree will be.
++ * @param dataIsCompressed
++ * if true, data in the path is compressed
++ * @param executorService
++ * Closeable ExecutorService to use for the PathChildrenCache's background thread
++ */
++ public PathChildrenCache(CuratorFramework client,
++ String path,
++ boolean cacheData,
++ DescendantHandlingMode descendantHandlingMode,
++ boolean dataIsCompressed,
++ final CloseableExecutorService executorService) {
+ this.client = client;
+ this.path = path;
+ this.cacheData = cacheData;
++ this.descendantHandlingMode = descendantHandlingMode;
+ this.dataIsCompressed = dataIsCompressed;
+ this.executorService = executorService;
+ ensurePath = client.newNamespaceAwareEnsurePath(path);
+@@ -223,42 +321,43 @@ public class PathChildrenCache implements Closeable
+
+ /**
+ * Start the cache. The cache is not started automatically. You must call this method.
+- *
+- * @throws Exception errors
++ *
++ * @throws Exception
++ * errors
+ */
+- public void start() throws Exception
+- {
++ public void start() throws Exception {
+ start(StartMode.NORMAL);
+ }
+
+ /**
+ * Same as {@link #start()} but gives the option of doing an initial build
+- *
+- * @param buildInitial if true, {@link #rebuild()} will be called before this method
+- * returns in order to get an initial view of the node; otherwise,
+- * the cache will be initialized asynchronously
+- * @throws Exception errors
++ *
++ * @param buildInitial
++ * if true, {@link #rebuild()} will be called before this method returns in order to
++ * get an initial view of the node; otherwise, the cache will be initialized
++ * asynchronously
++ * @throws Exception
++ * errors
+ * @deprecated use {@link #start(StartMode)}
+ */
+- public void start(boolean buildInitial) throws Exception
+- {
++ @Deprecated
++ public void start(boolean buildInitial) throws Exception {
+ start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
+ }
+
+ /**
+ * Method of priming cache on {@link PathChildrenCache#start(StartMode)}
+ */
+- public enum StartMode
+- {
++ public enum StartMode {
+ /**
+- * cache will _not_ be primed. i.e. it will start empty and you will receive
+- * events for all nodes added, etc.
++ * cache will _not_ be primed. i.e. it will start empty and you will receive events for all
++ * nodes added, etc.
+ */
+ NORMAL,
+
+ /**
+- * {@link PathChildrenCache#rebuild()} will be called before this method returns in
+- * order to get an initial view of the node.
++ * {@link PathChildrenCache#rebuild()} will be called before this method returns in order to
++ * get an initial view of the node.
+ */
+ BUILD_INITIAL_CACHE,
+
+@@ -271,34 +370,32 @@ public class PathChildrenCache implements Closeable
+
+ /**
+ * Start the cache. The cache is not started automatically. You must call this method.
+- *
+- * @param mode Method for priming the cache
+- * @throws Exception errors
++ *
++ * @param mode
++ * Method for priming the cache
++ * @throws Exception
++ * errors
+ */
+- public void start(StartMode mode) throws Exception
+- {
+- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
++ public void start(StartMode mode) throws Exception {
++ Preconditions.checkState(
++ state.compareAndSet(State.LATENT, State.STARTED), "already started");
+ mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+
+- switch ( mode )
+- {
+- case NORMAL:
+- {
++ switch (mode) {
++ case NORMAL: {
+ offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+ break;
+ }
+
+- case BUILD_INITIAL_CACHE:
+- {
++ case BUILD_INITIAL_CACHE: {
+ rebuild();
+ break;
+ }
+
+- case POST_INITIALIZED_EVENT:
+- {
+- initialSet.set(Maps.<String, ChildData>newConcurrentMap());
++ case POST_INITIALIZED_EVENT: {
++ initialSet.set(Maps.<String, ChildData> newConcurrentMap());
+ offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
+ break;
+ }
+@@ -306,13 +403,13 @@ public class PathChildrenCache implements Closeable
+ }
+
+ /**
+- * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
+- * for all needed data WITHOUT generating any events to send to listeners.
+- *
+- * @throws Exception errors
++ * NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying for all
++ * needed data WITHOUT generating any events to send to listeners.
++ *
++ * @throws Exception
++ * errors
+ */
+- public void rebuild() throws Exception
+- {
++ public void rebuild() throws Exception {
+ Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+
+ ensurePath.ensure(client.getZookeeperClient());
+@@ -320,13 +417,11 @@ public class PathChildrenCache implements Closeable
+ clear();
+
+ List<String> children = client.getChildren().forPath(path);
+- for ( String child : children )
+- {
++ for (String child : children) {
+ String fullPath = ZKPaths.makePath(path, child);
+ internalRebuildNode(fullPath);
+
+- if ( rebuildTestExchanger != null )
+- {
++ if (rebuildTestExchanger != null) {
+ rebuildTestExchanger.exchange(new Object());
+ }
+ }
+@@ -335,16 +430,34 @@ public class PathChildrenCache implements Closeable
+ offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
+ }
+
++ private void rebuild(String root) throws Exception {
++ Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
++
++ ensurePath.ensure(client.getZookeeperClient());
++
++ List<String> children = client.getChildren().forPath(path);
++ if (children != null && !children.isEmpty()) {
++ for (String child : children) {
++ rebuild(child);
++ }
++
++ }
++
++ }
++
+ /**
+ * NOTE: this is a BLOCKING method. Rebuild the internal cache for the given node by querying
+ * for all needed data WITHOUT generating any events to send to listeners.
+- *
+- * @param fullPath full path of the node to rebuild
+- * @throws Exception errors
++ *
++ * @param fullPath
++ * full path of the node to rebuild
++ * @throws Exception
++ * errors
+ */
+- public void rebuildNode(String fullPath) throws Exception
+- {
+- Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath);
++ public void rebuildNode(String fullPath) throws Exception {
++ Preconditions.checkArgument(
++ ZKPaths.getPathAndNode(fullPath).getPath().equals(path),
++ "Node is not part of this cache: " + fullPath);
+ Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+
+ ensurePath.ensure(client.getZookeeperClient());
+@@ -357,14 +470,13 @@ public class PathChildrenCache implements Closeable
+
+ /**
+ * Close/end the cache
+- *
+- * @throws IOException errors
++ *
++ * @throws IOException
++ * errors
+ */
+ @Override
+- public void close() throws IOException
+- {
+- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+- {
++ public void close() throws IOException {
++ if (state.compareAndSet(State.STARTED, State.CLOSED)) {
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ executorService.close();
+ }
+@@ -372,64 +484,61 @@ public class PathChildrenCache implements Closeable
+
+ /**
+ * Return the cache listenable
+- *
++ *
+ * @return listenable
+ */
+- public ListenerContainer<PathChildrenCacheListener> getListenable()
+- {
++ public ListenerContainer<PathChildrenCacheListener> getListenable() {
+ return listeners;
+ }
+
+ /**
+- * Return the current data. There are no guarantees of accuracy. This is
+- * merely the most recent view of the data. The data is returned in sorted order.
+- *
++ * Return the current data. There are no guarantees of accuracy. This is merely the most recent
++ * view of the data. The data is returned in sorted order.
++ *
+ * @return list of children and data
+ */
+- public List<ChildData> getCurrentData()
+- {
+- return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values()));
++ public List<ChildData> getCurrentData() {
++ return ImmutableList.copyOf(Sets.<ChildData> newTreeSet(currentData.values()));
+ }
+
+ /**
+ * Return the current data for the given path. There are no guarantees of accuracy. This is
+- * merely the most recent view of the data. If there is no child with that path, <code>null</code>
+- * is returned.
+- *
+- * @param fullPath full path to the node to check
++ * merely the most recent view of the data. If there is no child with that path,
++ * <code>null</code> is returned.
++ *
++ * @param fullPath
++ * full path to the node to check
+ * @return data or null
+ */
+- public ChildData getCurrentData(String fullPath)
+- {
++ public ChildData getCurrentData(String fullPath) {
+ return currentData.get(fullPath);
+ }
+
+ /**
+- * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+- * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
+- *
+- * @param fullPath the path of the node to clear
++ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to
++ * {@link ChildData#getData()} for this node will return <code>null</code>.
++ *
++ * @param fullPath
++ * the path of the node to clear
+ */
+- public void clearDataBytes(String fullPath)
+- {
++ public void clearDataBytes(String fullPath) {
+ clearDataBytes(fullPath, -1);
+ }
+
+ /**
+- * As a memory optimization, you can clear the cached data bytes for a node. Subsequent
+- * calls to {@link ChildData#getData()} for this node will return <code>null</code>.
+- *
+- * @param fullPath the path of the node to clear
+- * @param ifVersion if non-negative, only clear the data if the data's version matches this version
++ * As a memory optimization, you can clear the cached data bytes for a node. Subsequent calls to
++ * {@link ChildData#getData()} for this node will return <code>null</code>.
++ *
++ * @param fullPath
++ * the path of the node to clear
++ * @param ifVersion
++ * if non-negative, only clear the data if the data's version matches this version
+ * @return true if the data was cleared
+ */
+- public boolean clearDataBytes(String fullPath, int ifVersion)
+- {
++ public boolean clearDataBytes(String fullPath, int ifVersion) {
+ ChildData data = currentData.get(fullPath);
+- if ( data != null )
+- {
+- if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )
+- {
++ if (data != null) {
++ if ((ifVersion < 0) || (ifVersion == data.getStat().getVersion())) {
+ data.clearData();
+ return true;
+ }
+@@ -439,275 +548,254 @@ public class PathChildrenCache implements Closeable
+
+ /**
+ * Clear out current data and begin a new query on the path
+- *
+- * @throws Exception errors
++ *
++ * @throws Exception
++ * errors
+ */
+- public void clearAndRefresh() throws Exception
+- {
++ public void clearAndRefresh() throws Exception {
+ currentData.clear();
+ offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+ }
+
+ /**
+- * Clears the current data without beginning a new query and without generating any events
+- * for listeners.
++ * Clears the current data without beginning a new query and without generating any events for
++ * listeners.
+ */
+- public void clear()
+- {
++ public void clear() {
+ currentData.clear();
+ }
+
+- enum RefreshMode
+- {
+- STANDARD,
+- FORCE_GET_DATA_AND_STAT,
+- POST_INITIALIZED
++ enum RefreshMode {
++ STANDARD, FORCE_GET_DATA_AND_STAT, POST_INITIALIZED
+ }
+
+- void refresh(final RefreshMode mode) throws Exception
+- {
++ void refresh(final RefreshMode mode, final String nodePath) throws Exception {
+ ensurePath.ensure(client.getZookeeperClient());
+
+- final BackgroundCallback callback = new BackgroundCallback()
+- {
++ final BackgroundCallback callback = new BackgroundCallback() {
++
+ @Override
+- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+- {
+- processChildren(event.getChildren(), mode);
++ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
++ processChildren(event.getPath(), event.getChildren(), mode);
+ }
+ };
+
+- client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
++ client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(nodePath);
+ }
+
+- void callListeners(final PathChildrenCacheEvent event)
+- {
+- listeners.forEach
+- (
+- new Function<PathChildrenCacheListener, Void>()
+- {
+- @Override
+- public Void apply(PathChildrenCacheListener listener)
+- {
+- try
+- {
+- listener.childEvent(client, event);
+- }
+- catch ( Exception e )
+- {
+- handleException(e);
+- }
+- return null;
+- }
++ void refresh(final RefreshMode mode) throws Exception {
++ refresh(mode, path);
++ }
++
++ void callListeners(final PathChildrenCacheEvent event) {
++ listeners.forEach(new Function<PathChildrenCacheListener, Void>() {
++
++ @Override
++ public Void apply(PathChildrenCacheListener listener) {
++ try {
++ listener.childEvent(client, event);
++ } catch (Exception e) {
++ handleException(e);
+ }
+- );
++ return null;
++ }
++ });
+ }
+
+- void getDataAndStat(final String fullPath) throws Exception
+- {
+- BackgroundCallback existsCallback = new BackgroundCallback()
+- {
++ void getDataAndStat(final String fullPath) throws Exception {
++ BackgroundCallback existsCallback = new BackgroundCallback() {
++
+ @Override
+- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+- {
++ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ applyNewData(fullPath, event.getResultCode(), event.getStat(), null);
+ }
+ };
+
+- BackgroundCallback getDataCallback = new BackgroundCallback()
+- {
++ BackgroundCallback getDataCallback = new BackgroundCallback() {
++
+ @Override
+- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+- {
++ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ applyNewData(fullPath, event.getResultCode(), event.getStat(), event.getData());
+ }
+ };
+
+- if ( cacheData )
+- {
+- if ( dataIsCompressed )
+- {
+- client.getData().decompressed().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
+- }
+- else
+- {
+- client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
++ if (cacheData) {
++ if (dataIsCompressed) {
++ client.getData().decompressed().usingWatcher(dataWatcher).inBackground(
++ getDataCallback).forPath(fullPath);
++ } else {
++ client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(
++ fullPath);
+ }
+- }
+- else
+- {
+- client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(fullPath);
++ } else {
++ client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(
++ fullPath);
+ }
+ }
+
+ /**
+ * Default behavior is just to log the exception
+- *
+- * @param e the exception
++ *
++ * @param e
++ * the exception
+ */
+- protected void handleException(Throwable e)
+- {
++ protected void handleException(Throwable e) {
+ log.error("", e);
+ }
+
+ @VisibleForTesting
+- protected void remove(String fullPath)
+- {
++ protected void remove(String fullPath) {
+ ChildData data = currentData.remove(fullPath);
+- if ( data != null )
+- {
+- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, data)));
++ if (data != null) {
++ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.CHILD_REMOVED, data)));
+ }
+
+ Map<String, ChildData> localInitialSet = initialSet.get();
+- if ( localInitialSet != null )
+- {
++ if (localInitialSet != null) {
+ localInitialSet.remove(fullPath);
+ maybeOfferInitializedEvent(localInitialSet);
+ }
+ }
+
+- private void internalRebuildNode(String fullPath) throws Exception
+- {
+- if ( cacheData )
+- {
+- try
+- {
++ private void internalRebuildNode(String fullPath) throws Exception {
++ if (cacheData) {
++ try {
+ Stat stat = new Stat();
+- byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath);
++ byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(
++ stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(
++ fullPath);
+ currentData.put(fullPath, new ChildData(fullPath, stat, bytes));
+- }
+- catch ( KeeperException.NoNodeException ignore )
+- {
++ } catch (KeeperException.NoNodeException ignore) {
+ // node no longer exists - remove it
+ currentData.remove(fullPath);
+ }
+- }
+- else
+- {
++ } else {
+ Stat stat = client.checkExists().forPath(fullPath);
+- if ( stat != null )
+- {
++ if (stat != null) {
+ currentData.put(fullPath, new ChildData(fullPath, stat, null));
+- }
+- else
+- {
++ } else {
+ // node no longer exists - remove it
+ currentData.remove(fullPath);
+ }
+ }
+ }
+
+- private void handleStateChange(ConnectionState newState)
+- {
+- switch ( newState )
+- {
+- case SUSPENDED:
+- {
+- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
+- break;
+- }
+-
+- case LOST:
+- {
+- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
+- break;
+- }
++ private void handleStateChange(ConnectionState newState) {
++ switch (newState) {
++ case SUSPENDED: {
++ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
++ break;
++ }
+
+- case RECONNECTED:
+- {
+- try
+- {
+- offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
+- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
++ case LOST: {
++ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
++ break;
+ }
+- catch ( Exception e )
+- {
+- handleException(e);
++
++ case RECONNECTED: {
++ try {
++ offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
++ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
++ } catch (Exception e) {
++ handleException(e);
++ }
++ break;
+ }
+- break;
+- }
+ }
+ }
+
+- private void processChildren(List<String> children, RefreshMode mode) throws Exception
+- {
+- List<String> fullPaths = Lists.newArrayList(Lists.transform
+- (
+- children,
+- new Function<String, String>()
+- {
++ private void processChildren(final String root, List<String> children, RefreshMode mode)
++ throws Exception {
++ List<String> fullPaths = Lists.newArrayList(Lists.transform(
++ children, new Function<String, String>() {
++
+ @Override
+- public String apply(String child)
+- {
+- return ZKPaths.makePath(path, child);
++ public String apply(String child) {
++ return ZKPaths.makePath(root, child);
+ }
+- }
+- ));
++ }));
+ Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
+ removedNodes.removeAll(fullPaths);
++ removedNodes.remove(root);
+
+- for ( String fullPath : removedNodes )
+- {
+- remove(fullPath);
++ Set<String> nodesToKeep = Sets.newHashSet();
++ for (String removedNode : removedNodes) {
++ // Don't remove the current node being processed, or any of its parent nodes
++ if (removedNode.length() <= root.length() && root.startsWith(removedNode)) {
++ nodesToKeep.add(removedNode);
++ }
+ }
+
+- for ( String name : children )
+- {
+- String fullPath = ZKPaths.makePath(path, name);
++ for (String fullPath : removedNodes) {
++ if (!nodesToKeep.contains(fullPath)) {
++ remove(fullPath);
++ }
++ }
+
+- if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
+- {
++ for (String name : children) {
++ String fullPath = ZKPaths.makePath(root, name);
++
++ boolean exists = currentData.containsKey(fullPath);
++
++ if ((mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !exists) {
+ getDataAndStat(fullPath);
+ }
+
+ updateInitialSet(name, NULL_CHILD_DATA);
++
++ if (!exists && descendantHandlingMode == DescendantHandlingMode.ALL_DESCENDANTS) {
++ if (!removedNodes.contains(fullPath)) {
++ refresh(mode, fullPath);
++ }
++ }
+ }
+ maybeOfferInitializedEvent(initialSet.get());
+ }
+
+- private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes)
+- {
+- if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
++ private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes) {
++ if (resultCode == KeeperException.Code.OK.intValue()) // otherwise - node must have dropped
++ // or something - we should be getting
++ // another event
+ {
+ ChildData data = new ChildData(fullPath, stat, bytes);
+ ChildData previousData = currentData.put(fullPath, data);
+- if ( previousData == null ) // i.e. new
+- {
+- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
+- }
+- else if ( previousData.getStat().getVersion() != stat.getVersion() )
++ if (previousData == null) // i.e. new
+ {
+- offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
++ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
++ } else if (previousData.getStat().getVersion() != stat.getVersion()) {
++ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
+ }
+ updateInitialSet(ZKPaths.getNodeFromPath(fullPath), data);
+ }
+ }
+
+- private void updateInitialSet(String name, ChildData data)
+- {
++ private void updateInitialSet(String name, ChildData data) {
+ Map<String, ChildData> localInitialSet = initialSet.get();
+- if ( localInitialSet != null )
+- {
++ if (localInitialSet != null) {
+ localInitialSet.put(name, data);
+ maybeOfferInitializedEvent(localInitialSet);
+ }
+ }
+
+- private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet)
+- {
+- if ( !hasUninitialized(localInitialSet) )
+- {
++ private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet) {
++ if (!hasUninitialized(localInitialSet)) {
+ // all initial children have been processed - send initialized message
+
+- if ( initialSet.getAndSet(null) != null ) // avoid edge case - don't send more than 1 INITIALIZED event
++ if (initialSet.getAndSet(null) != null) // avoid edge case - don't send more than 1
++ // INITIALIZED event
+ {
+ final List<ChildData> children = ImmutableList.copyOf(localInitialSet.values());
+- PathChildrenCacheEvent event = new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)
+- {
++ PathChildrenCacheEvent event = new PathChildrenCacheEvent(
++ PathChildrenCacheEvent.Type.INITIALIZED, null) {
++
+ @Override
+- public List<ChildData> getInitialData()
+- {
++ public List<ChildData> getInitialData() {
+ return children;
+ }
+ };
+@@ -716,71 +804,56 @@ public class PathChildrenCache implements Closeable
+ }
+ }
+
+- private boolean hasUninitialized(Map<String, ChildData> localInitialSet)
+- {
+- if ( localInitialSet == null )
+- {
++ private boolean hasUninitialized(Map<String, ChildData> localInitialSet) {
++ if (localInitialSet == null) {
+ return false;
+ }
+
+- Map<String, ChildData> uninitializedChildren = Maps.filterValues
+- (
+- localInitialSet,
+- new Predicate<ChildData>()
+- {
++ Map<String, ChildData> uninitializedChildren = Maps.filterValues(
++ localInitialSet, new Predicate<ChildData>() {
++
+ @Override
+- public boolean apply(ChildData input)
+- {
+- return (input == NULL_CHILD_DATA); // check against ref intentional
++ public boolean apply(ChildData input) {
++ return (input == NULL_CHILD_DATA); // check against ref intentional
+ }
+- }
+- );
++ });
+ return (uninitializedChildren.size() != 0);
+ }
+
+- private void offerOperation(final Operation operation)
+- {
+- if ( operationsQuantizer.add(operation) )
+- {
+- submitToExecutor
+- (
+- new Runnable()
+- {
+- @Override
+- public void run()
+- {
+- try
+- {
+- operationsQuantizer.remove(operation);
+- operation.invoke();
+- }
+- catch ( Exception e )
+- {
+- handleException(e);
+- }
++ private void offerOperation(final Operation operation) {
++ if (operationsQuantizer.add(operation)) {
++ submitToExecutor(new Runnable() {
++
++ @Override
++ public void run() {
++ try {
++ operationsQuantizer.remove(operation);
++ operation.invoke();
++ } catch (Exception e) {
++ handleException(e);
+ }
+ }
+- );
++ });
+ }
+ }
+
+ /**
+ * Submits a runnable to the executor.
+ * <p/>
+- * This method is synchronized because it has to check state about whether this instance is still open. Without this check
+- * there is a race condition with the dataWatchers that get set. Even after this object is closed() it can still be
+- * called by those watchers, because the close() method cannot actually disable the watcher.
++ * This method is synchronized because it has to check state about whether this instance is
++ * still open. Without this check there is a race condition with the dataWatchers that get set.
++ * Even after this object is closed() it can still be called by those watchers, because the
++ * close() method cannot actually disable the watcher.
+ * <p/>
+- * The synchronization overhead should be minimal if non-existant as this is generally only called from the
+- * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
+- * we want to protect from.
+- *
+- * @param command The runnable to run
++ * The synchronization overhead should be minimal if non-existant as this is generally only
++ * called from the ZK client thread and will only contend if close() is called in parallel with
++ * an update, and that's the exact state we want to protect from.
++ *
++ * @param command
++ * The runnable to run
+ */
+- private synchronized void submitToExecutor(final Runnable command)
+- {
+- if ( state.get() == State.STARTED )
+- {
++ private synchronized void submitToExecutor(final Runnable command) {
++ if (state.get() == State.STARTED) {
+ executorService.submit(command);
+ }
+ }
+diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+index 4b117fb..e524153 100644
+--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
++++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+@@ -1,29 +1,37 @@
+ /**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing,
+- * software distributed under the License is distributed on an
+- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+- * KIND, either express or implied. See the License for the
+- * specific language governing permissions and limitations
+- * under the License.
++ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
++ * agreements. See the NOTICE file distributed with this work for additional information regarding
++ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance with the License. You may obtain a
++ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
++ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
++ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
++ * for the specific language governing permissions and limitations under the License.
+ */
+ package org.apache.curator.framework.recipes.cache;
+
+-import com.google.common.collect.Lists;
+-import com.google.common.io.Closeables;
++import java.util.Collection;
++import java.util.List;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.Exchanger;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.Semaphore;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicReference;
++
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.framework.CuratorFrameworkFactory;
+ import org.apache.curator.framework.api.UnhandledErrorListener;
+ import org.apache.curator.framework.recipes.BaseClassForTests;
++import org.apache.curator.framework.recipes.cache.PathChildrenCache.DescendantHandlingMode;
+ import org.apache.curator.retry.RetryOneTime;
+ import org.apache.curator.test.KillSession;
+ import org.apache.curator.test.Timing;
+@@ -31,57 +39,48 @@ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+-import java.util.Collection;
+-import java.util.List;
+-import java.util.concurrent.*;
+-import java.util.concurrent.atomic.AtomicInteger;
+-import java.util.concurrent.atomic.AtomicReference;
+
+-public class TestPathChildrenCache extends BaseClassForTests
+-{
++import com.google.common.collect.Lists;
++import com.google.common.io.Closeables;
++
++public class TestPathChildrenCache extends BaseClassForTests {
++
+ @Test
+- public void testPostInitializedForEmpty() throws Exception
+- {
++ public void testPostInitializedForEmpty() throws Exception {
+ Timing timing = new Timing();
+ PathChildrenCache cache = null;
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+- try
+- {
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
++ 1));
++ try {
+ client.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ cache = new PathChildrenCache(client, "/test", true);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+- {
+- latch.countDown();
+- }
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) {
++ latch.countDown();
+ }
+ }
+- );
++ });
+ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+ Assert.assertTrue(timing.awaitLatch(latch));
+- }
+- finally
+- {
++ } finally {
+ Closeables.closeQuietly(cache);
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+- public void testAsyncInitialPopulation() throws Exception
+- {
++ public void testAsyncInitialPopulation() throws Exception {
+ PathChildrenCache cache = null;
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+- try
+- {
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), new RetryOneTime(1));
++ try {
+ client.start();
+
+ client.create().forPath("/test");
+@@ -89,17 +88,14 @@ public class TestPathChildrenCache extends BaseClassForTests
+
+ final BlockingQueue<PathChildrenCacheEvent> events = new LinkedBlockingQueue<PathChildrenCacheEvent>();
+ cache = new PathChildrenCache(client, "/test", true);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- events.offer(event);
+- }
+- }
+- );
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ events.offer(event);
++ }
++ });
+ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+
+ PathChildrenCacheEvent event = events.poll(10, TimeUnit.SECONDS);
+@@ -108,22 +104,20 @@ public class TestPathChildrenCache extends BaseClassForTests
+ event = events.poll(10, TimeUnit.SECONDS);
+ Assert.assertEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED);
+ Assert.assertEquals(event.getInitialData().size(), 1);
+- }
+- finally
+- {
++ } finally {
+ Closeables.closeQuietly(cache);
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+- public void testChildrenInitialized() throws Exception
+- {
++ public void testChildrenInitialized() throws Exception {
+ Timing timing = new Timing();
+ PathChildrenCache cache = null;
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+- try
+- {
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
++ 1));
++ try {
+ client.start();
+ client.create().forPath("/test");
+
+@@ -131,24 +125,18 @@ public class TestPathChildrenCache extends BaseClassForTests
+
+ final CountDownLatch addedLatch = new CountDownLatch(3);
+ final CountDownLatch initLatch = new CountDownLatch(1);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+- {
+- addedLatch.countDown();
+- }
+- else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+- {
+- initLatch.countDown();
+- }
+- }
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++ addedLatch.countDown();
++ } else if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) {
++ initLatch.countDown();
+ }
+- );
++ }
++ });
+
+ client.create().forPath("/test/1", "1".getBytes());
+ client.create().forPath("/test/2", "2".getBytes());
+@@ -162,45 +150,37 @@ public class TestPathChildrenCache extends BaseClassForTests
+ Assert.assertEquals(cache.getCurrentData().get(0).getData(), "1".getBytes());
+ Assert.assertEquals(cache.getCurrentData().get(1).getData(), "2".getBytes());
+ Assert.assertEquals(cache.getCurrentData().get(2).getData(), "3".getBytes());
+- }
+- finally
+- {
++ } finally {
+ Closeables.closeQuietly(cache);
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+- public void testUpdateWhenNotCachingData() throws Exception
+- {
++ public void testUpdateWhenNotCachingData() throws Exception {
+ Timing timing = new Timing();
+
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(
++ 1));
+ client.start();
+- try
+- {
++ try {
+ final CountDownLatch updatedLatch = new CountDownLatch(1);
+ final CountDownLatch addedLatch = new CountDownLatch(1);
+ client.create().creatingParentsIfNeeded().forPath("/test");
+ PathChildrenCache cache = new PathChildrenCache(client, "/test", false);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
+- {
+- updatedLatch.countDown();
+- }
+- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+- {
+- addedLatch.countDown();
+- }
+- }
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
++ updatedLatch.countDown();
++ } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++ addedLatch.countDown();
+ }
+- );
++ }
++ });
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+ client.create().forPath("/test/foo", "first".getBytes());
+@@ -208,94 +188,72 @@ public class TestPathChildrenCache extends BaseClassForTests
+
+ client.setData().forPath("/test/foo", "something new".getBytes());
+ Assert.assertTrue(timing.awaitLatch(updatedLatch));
+- }
+- finally
+- {
++ } finally {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+- public void testEnsurePath() throws Exception
+- {
++ public void testEnsurePath() throws Exception {
+ Timing timing = new Timing();
+
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), new RetryOneTime(1));
+ client.start();
+- try
+- {
++ try {
+ PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false);
+ cache.start();
+ timing.sleepABit();
+
+- try
+- {
++ try {
+ client.create().forPath("/one/two/three/four");
+- }
+- catch ( KeeperException.NoNodeException e )
+- {
++ } catch (KeeperException.NoNodeException e) {
+ Assert.fail("Path should exist", e);
+ }
+- }
+- finally
+- {
++ } finally {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+- public void testDeleteThenCreate() throws Exception
+- {
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++ public void testDeleteThenCreate() throws Exception {
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), new RetryOneTime(1));
+ client.start();
+- try
+- {
++ try {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo", "one".getBytes());
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+- client.getUnhandledErrorListenable().addListener
+- (
+- new UnhandledErrorListener()
+- {
+- @Override
+- public void unhandledError(String message, Throwable e)
+- {
+- error.set(e);
+- }
+- }
+- );
++ client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
++
++ @Override
++ public void unhandledError(String message, Throwable e) {
++ error.set(e);
++ }
++ });
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+ final CountDownLatch postRemovedLatch = new CountDownLatch(1);
+ final CountDownLatch dataLatch = new CountDownLatch(1);
+ PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+- {
+- removedLatch.countDown();
+- Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
+- }
+- else
+- {
+- try
+- {
+- Assert.assertEquals(event.getData().getData(), "two".getBytes());
+- }
+- finally
+- {
+- dataLatch.countDown();
+- }
+- }
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
++ removedLatch.countDown();
++ Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
++ } else {
++ try {
++ Assert.assertEquals(event.getData().getData(), "two".getBytes());
++ } finally {
++ dataLatch.countDown();
+ }
+ }
+- );
++ }
++ });
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+ client.delete().forPath("/test/foo");
+@@ -305,26 +263,22 @@ public class TestPathChildrenCache extends BaseClassForTests
+ Assert.assertTrue(dataLatch.await(10, TimeUnit.SECONDS));
+
+ Throwable t = error.get();
+- if ( t != null )
+- {
++ if (t != null) {
+ Assert.fail("Assert", t);
+ }
+
+ cache.close();
+- }
+- finally
+- {
++ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+- public void testRebuildAgainstOtherProcesses() throws Exception
+- {
+- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++ public void testRebuildAgainstOtherProcesses() throws Exception {
++ final CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), new RetryOneTime(1));
+ client.start();
+- try
+- {
++ try {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo");
+ client.create().forPath("/test/bar");
+@@ -332,69 +286,56 @@ public class TestPathChildrenCache extends BaseClassForTests
+
+ final CountDownLatch addedLatch = new CountDownLatch(2);
+ final PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+- {
+- if ( event.getData().getPath().equals("/test/test") )
+- {
+- addedLatch.countDown();
+- }
+- }
+- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
+- {
+- if ( event.getData().getPath().equals("/test/snafu") )
+- {
+- addedLatch.countDown();
+- }
+- }
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++ if (event.getData().getPath().equals("/test/test")) {
++ addedLatch.countDown();
++ }
++ } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
++ if (event.getData().getPath().equals("/test/snafu")) {
++ addedLatch.countDown();
+ }
+ }
+- );
++ }
++ });
+ cache.rebuildTestExchanger = new Exchanger<Object>();
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final AtomicReference<String> deletedPath = new AtomicReference<String>();
+- Future<Object> future = service.submit
+- (
+- new Callable<Object>()
+- {
+- @Override
+- public Object call() throws Exception
+- {
+- cache.rebuildTestExchanger.exchange(new Object());
+-
+- // simulate another process adding a node while we're rebuilding
+- client.create().forPath("/test/test");
+-
+- List<ChildData> currentData = cache.getCurrentData();
+- Assert.assertTrue(currentData.size() > 0);
+-
+- // simulate another process removing a node while we're rebuilding
+- client.delete().forPath(currentData.get(0).getPath());
+- deletedPath.set(currentData.get(0).getPath());
+-
+- cache.rebuildTestExchanger.exchange(new Object());
+-
+- ChildData childData = null;
+- while ( childData == null )
+- {
+- childData = cache.getCurrentData("/test/snafu");
+- Thread.sleep(1000);
+- }
+- Assert.assertEquals(childData.getData(), "original".getBytes());
+- client.setData().forPath("/test/snafu", "grilled".getBytes());
+-
+- cache.rebuildTestExchanger.exchange(new Object());
+-
+- return null;
+- }
++ Future<Object> future = service.submit(new Callable<Object>() {
++
++ @Override
++ public Object call() throws Exception {
++ cache.rebuildTestExchanger.exchange(new Object());
++
++ // simulate another process adding a node while we're rebuilding
++ client.create().forPath("/test/test");
++
++ List<ChildData> currentData = cache.getCurrentData();
++ Assert.assertTrue(currentData.size() > 0);
++
++ // simulate another process removing a node while we're rebuilding
++ client.delete().forPath(currentData.get(0).getPath());
++ deletedPath.set(currentData.get(0).getPath());
++
++ cache.rebuildTestExchanger.exchange(new Object());
++
++ ChildData childData = null;
++ while (childData == null) {
++ childData = cache.getCurrentData("/test/snafu");
++ Thread.sleep(1000);
+ }
+- );
++ Assert.assertEquals(childData.getData(), "original".getBytes());
++ client.setData().forPath("/test/snafu", "grilled".getBytes());
++
++ cache.rebuildTestExchanger.exchange(new Object());
++
++ return null;
++ }
++ });
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ future.get();
+
+@@ -404,21 +345,18 @@ public class TestPathChildrenCache extends BaseClassForTests
+ Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
+
+ cache.close();
+- }
+- finally
+- {
++ } finally {
+ client.close();
+ }
+ }
+
+ // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data
+ @Test
+- public void testIssue27() throws Exception
+- {
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++ public void testIssue27() throws Exception {
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), new RetryOneTime(1));
+ client.start();
+- try
+- {
++ try {
+ client.create().forPath("/base");
+ client.create().forPath("/base/a");
+ client.create().forPath("/base/b");
+@@ -429,18 +367,15 @@ public class TestPathChildrenCache extends BaseClassForTests
+ final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
+ final Semaphore semaphore = new Semaphore(0);
+ PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- events.add(event.getType());
+- semaphore.release();
+- }
+- }
+- );
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ events.add(event.getType());
++ semaphore.release();
++ }
++ });
+ cache.start();
+
+ Assert.assertTrue(semaphore.tryAcquire(3, 10, TimeUnit.SECONDS));
+@@ -451,30 +386,25 @@ public class TestPathChildrenCache extends BaseClassForTests
+ client.create().forPath("/base/a");
+ Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
+
+- List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
+- (
++ List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList(
+ PathChildrenCacheEvent.Type.CHILD_ADDED,
+ PathChildrenCacheEvent.Type.CHILD_ADDED,
+ PathChildrenCacheEvent.Type.CHILD_ADDED,
+ PathChildrenCacheEvent.Type.CHILD_REMOVED,
+- PathChildrenCacheEvent.Type.CHILD_ADDED
+- );
++ PathChildrenCacheEvent.Type.CHILD_ADDED);
+ Assert.assertEquals(expected, events);
+- }
+- finally
+- {
++ } finally {
+ client.close();
+ }
+ }
+
+ // test Issue 27 using new rebuild() method
+ @Test
+- public void testIssue27Alt() throws Exception
+- {
+- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
++ public void testIssue27Alt() throws Exception {
++ CuratorFramework client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), new RetryOneTime(1));
+ client.start();
+- try
+- {
++ try {
+ client.create().forPath("/base");
+ client.create().forPath("/base/a");
+ client.create().forPath("/base/b");
+@@ -485,18 +415,15 @@ public class TestPathChildrenCache extends BaseClassForTests
+ final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
+ final Semaphore semaphore = new Semaphore(0);
+ PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- events.add(event.getType());
+- semaphore.release();
+- }
+- }
+- );
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ events.add(event.getType());
++ semaphore.release();
++ }
++ });
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+ client.delete().forPath("/base/a");
+@@ -505,27 +432,23 @@ public class TestPathChildrenCache extends BaseClassForTests
+ client.create().forPath("/base/a");
+ Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
+
+- List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
+- (
++ List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList(
+ PathChildrenCacheEvent.Type.CHILD_REMOVED,
+- PathChildrenCacheEvent.Type.CHILD_ADDED
+- );
++ PathChildrenCacheEvent.Type.CHILD_ADDED);
+ Assert.assertEquals(expected, events);
+- }
+- finally
+- {
++ } finally {
+ client.close();
+ }
+ }
+
+ @Test
+- public void testKilledSession() throws Exception
+- {
++ public void testKilledSession() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = null;
+- try
+- {
+- client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
++ try {
++ client = CuratorFrameworkFactory.newClient(
++ server.getConnectString(), timing.session(), timing.connection(),
++ new RetryOneTime(1));
+ client.start();
+ client.create().forPath("/test");
+
+@@ -536,32 +459,22 @@ public class TestPathChildrenCache extends BaseClassForTests
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+- cache.getListenable().addListener
+- (
+- new PathChildrenCacheListener()
+- {
+- @Override
+- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+- {
+- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+- {
+- childAddedLatch.countDown();
+- }
+- else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
+- {
+- lostLatch.countDown();
+- }
+- else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
+- {
+- reconnectedLatch.countDown();
+- }
+- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+- {
+- removedLatch.countDown();
+- }
+- }
++ cache.getListenable().addListener(new PathChildrenCacheListener() {
++
++ @Override
++ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
++ throws Exception {
++ if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
++ childAddedLatch.countDown();
++ } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST) {
++ lostLatch.countDown();
++ } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED) {
++ reconnectedLatch.countDown();
++ } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
++ removedLatch.countDown();
+ }
+- );
++ }
++ });
+
+ client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+ Assert.assertTrue(timing.awaitLatch(childAddedLatch));
+@@ -570,54 +483,48 @@ public class TestPathChildrenCache extends BaseClassForTests
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+ Assert.assertTrue(timing.awaitLatch(reconnected
<TRUNCATED>