You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/14 13:37:29 UTC
[curator] 03/03: Added some functional utilities that will aid
using persistent/recursive watchers
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch persistent-watcher-functional
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 0b52b2c7f2743938a86865dc9898009992b2538e
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 14 15:01:01 2019 +0300
Added some functional utilities that will aid using persistent/recursive watchers
---
.../framework/recipes/watch/PersistentWatcher.java | 18 +++
.../framework/recipes/watch/WatcherHelpers.java | 79 +++++++++++++
.../curator/framework/recipes/watch/Watchers.java | 93 +++++++++++++++
.../framework/recipes/watch/WatchersImpl.java | 110 ++++++++++++++++++
.../recipes/watch/TestPersistentWatcher.java | 18 +++
.../framework/recipes/watch/TestWatchers.java | 125 +++++++++++++++++++++
.../x/async/api/AsyncPersistentWatchBuilder.java | 18 +++
.../details/AsyncPersistentWatchBuilderImpl.java | 75 -------------
src/site/confluence/utilities.confluence | 17 +++
9 files changed, 478 insertions(+), 75 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 9f0f497..201caf9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
new file mode 100644
index 0000000..4db368a
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.IntPredicate;
+import java.util.function.Predicate;
+
+public class WatcherHelpers
+{
+ private static final ZKPaths.PathAndNode NULL_PATH_AND_NODE = new ZKPaths.PathAndNode("/", "");
+
+ public static ZKPaths.PathAndNode mapToPathAndNode(WatchedEvent event)
+ {
+ if ( event.getType() == Watcher.Event.EventType.None )
+ {
+ return NULL_PATH_AND_NODE;
+ }
+ return ZKPaths.getPathAndNode(event.getPath());
+ }
+
+ public static List<String> mapToParts(WatchedEvent event)
+ {
+ if ( event.getType() == Watcher.Event.EventType.None )
+ {
+ return Collections.emptyList();
+ }
+ return ZKPaths.split(event.getPath());
+ }
+
+ public static Predicate<WatchedEvent> filterNode(Predicate<String> nodeFilter)
+ {
+ return event -> nodeFilter.test(mapToPathAndNode(event).getNode());
+ }
+
+ public static Predicate<WatchedEvent> filterPath(Predicate<String> pathFilter)
+ {
+ return event -> pathFilter.test(mapToPathAndNode(event).getPath());
+ }
+
+ public static Predicate<WatchedEvent> filterDepth(IntPredicate depthFilter)
+ {
+ return event -> depthFilter.test(mapToParts(event).size());
+ }
+
+ public static Predicate<WatchedEvent> filterSystemEvents()
+ {
+ return event -> event.getType() == Watcher.Event.EventType.None;
+ }
+
+ public static Predicate<WatchedEvent> filterNonSystemEvents()
+ {
+ return event -> event.getType() != Watcher.Event.EventType.None;
+ }
+
+ private WatcherHelpers()
+ {
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
new file mode 100644
index 0000000..600a7ec
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * Functional builders to wrap a ZooKeeper Watcher. Call the filter/map/etc. methods
+ * as needed and complete by calling {@link Next#process(java.util.function.Consumer)} which
+ * returns a Watcher that can be passed to any Curator/ZooKeeper method that accepts Watchers.
+ */
+public interface Watchers
+{
+ /**
+ * Filter the chained watched event. If the predicate returns false, the chain ends and no further processing
+ * occurs.
+ *
+ * @param filter predicate
+ * @return chain
+ */
+ static Next<WatchedEvent> filter(Predicate<WatchedEvent> filter)
+ {
+ return new WatchersImpl<>(filter);
+ }
+
+ /**
+ * Map the chained watched event to a new object
+ *
+ * @param mapper mapper
+ * @return chain
+ */
+ static <R> Next<R> map(Function<WatchedEvent, ? super R> mapper)
+ {
+ return new WatchersImpl<>(mapper);
+ }
+
+ interface Next<T>
+ {
+ /**
+ * Filter the chained value. If the predicate returns false, the chain ends and no further processing
+ * occurs.
+ *
+ * @param filter predicate
+ * @return chain
+ */
+ Next<T> filter(Predicate<? super T> filter);
+
+ /**
+ * Peek at the chained value
+ *
+ * @param consumer consumer
+ * @return chain
+ */
+ Next<T> peek(Consumer<? super T> consumer);
+
+ /**
+ * Map the chained value to a new object
+ *
+ * @param mapper mapper
+ * @return chain
+ */
+ <R> Next<R> map(Function<? super T, ? extends R> mapper);
+
+ /**
+ * Complete the chain and return a new Watcher that consists of all the previous steps in the
+ * chain. When the Watcher is called by ZooKeeper, all steps of the chain are called in order.
+ *
+ * @param handler consumer for the final value of the chain
+ * @return new Watcher
+ */
+ Watcher process(Consumer<T> handler);
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
new file mode 100644
index 0000000..ed0e02f
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+public class WatchersImpl<T> implements Watchers.Next<T>
+{
+ private final Step step;
+
+ private static class Step
+ {
+ private final Predicate filter;
+ private final Function mapper;
+
+ private final Step head;
+ private Step next;
+
+ Step(Predicate filter, Function mapper)
+ {
+ this(filter, mapper, null);
+ }
+
+ Step(Predicate filter, Function mapper, Step head)
+ {
+ this.filter = (filter != null) ? filter : (__ -> true);
+ this.mapper = (mapper != null) ? mapper : Function.identity();
+ this.head = (head != null) ? head : this;
+ }
+ }
+
+ WatchersImpl(Predicate<WatchedEvent> filter)
+ {
+ step = new Step(filter, null);
+ }
+
+ <R> WatchersImpl(Function<WatchedEvent, ? super R> mapper)
+ {
+ step = new Step(null, mapper);
+ }
+
+ private WatchersImpl(Step step)
+ {
+ this.step = step;
+ }
+
+ @Override
+ public Watchers.Next<T> filter(Predicate<? super T> filter)
+ {
+ this.step.next = new Step(filter, null, step.head);
+ return new WatchersImpl<>(this.step.next);
+ }
+
+ @Override
+ public Watchers.Next<T> peek(Consumer<? super T> consumer)
+ {
+ return filter(value -> {
+ consumer.accept(value);
+ return true;
+ });
+ }
+
+ @Override
+ public <R> Watchers.Next<R> map(Function<? super T, ? extends R> mapper)
+ {
+ this.step.next = new Step(null, mapper, step.head);
+ return new WatchersImpl<>(this.step.next);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked") // all public APIs are correctly typed so this is safe
+ public Watcher process(Consumer<T> handler)
+ {
+ return event -> {
+ Object value = event;
+ Step currentStep = step.head;
+ while ( currentStep != null )
+ {
+ if ( !currentStep.filter.test(value) )
+ {
+ return; // exit lambda
+ }
+ value = currentStep.mapper.apply(value);
+ currentStep = currentStep.next;
+ }
+
+ ((Consumer)handler).accept(value);
+ };
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..ad9703b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.watch;
import org.apache.curator.framework.CuratorFramework;
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
new file mode 100644
index 0000000..45185b2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.watch.WatcherHelpers.*;
+import static org.apache.curator.framework.recipes.watch.Watchers.filter;
+import static org.apache.curator.framework.recipes.watch.Watchers.map;
+
+public class TestWatchers extends BaseClassForTests
+{
+ private final Timing2 timing = new Timing2();
+
+ @Test
+ public void testFilterFirst() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ Semaphore latch = new Semaphore(0);
+ Watcher watcher = filter(filterNonSystemEvents())
+ .filter(event -> event.getType() == Watcher.Event.EventType.NodeCreated)
+ .map(WatcherHelpers::mapToPathAndNode)
+ .filter(z -> z.getPath().equals("/one/two"))
+ .process(__ -> latch.release());
+ client.checkExists().usingWatcher(watcher).forPath("/one/two/three");
+
+ client.create().forPath("/one");
+ timing.sleepABit();
+ Assert.assertEquals(latch.availablePermits(), 0);
+
+ client.create().forPath("/one/two");
+ timing.sleepABit();
+ Assert.assertEquals(latch.availablePermits(), 0);
+
+ client.create().forPath("/one/two/three");
+ Assert.assertTrue(timing.acquireSemaphore(latch, 1));
+ }
+ }
+
+ @Test
+ public void testMapFirst() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ BlockingQueue<String> result = new LinkedBlockingQueue<>();
+ BlockingQueue<String> peekedResult = new LinkedBlockingQueue<>();
+ Watcher watcher = map(event -> "The path is: " + event.getPath())
+ .peek(peekedResult::add)
+ .process(result::add);
+ client.checkExists().usingWatcher(watcher).forPath("/one");
+
+ client.create().forPath("/one");
+ Assert.assertEquals(timing.takeFromQueue(peekedResult), "The path is: /one");
+ Assert.assertEquals(timing.takeFromQueue(result), "The path is: /one");
+ }
+ }
+
+ @Test
+ public void testSystemEvents() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Watcher watcher = filter(filterSystemEvents())
+ .process(__ -> latch.countDown());
+ client.checkExists().usingWatcher(watcher).forPath("/one");
+ server.stop();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ }
+
+ @Test
+ public void testHelpers() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+ {
+ client.start();
+
+ BlockingQueue<String> result = new LinkedBlockingQueue<>();
+ Watcher watcher = filter(filterPath(node -> node.startsWith("/a")))
+ .filter(filterNode(node -> node.equals("c")))
+ .filter(filterDepth(depth -> depth > 1))
+ .map(WatcherHelpers::mapToParts)
+ .map(parts -> String.join("|", parts))
+ .process(result::add);
+ client.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+ client.create().creatingParentsIfNeeded().forPath("/a/b/c");
+ Assert.assertEquals(timing.takeFromQueue(result), "a|b|c");
+ }
+ }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
- {
- private final CuratorFrameworkImpl client;
- private final Filters filters;
- private Watching watching = null;
- private boolean recursive = false;
-
- AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
- {
- this.client = client;
- this.filters = filters;
- }
-
- @Override
- public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
- {
- recursive = true;
- return this;
- }
-
- @Override
- public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
- {
- watching = new Watching(client, watcher);
- return this;
- }
-
- @Override
- public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
- {
- watching = new Watching(client, watcher);
- return this;
- }
-
- @Override
- public AsyncStage<Void> forPath(String path)
- {
- BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
- AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
- return safeCall(common.internalCallback, () -> builder.forPath(path));
- }
- }
\ No newline at end of file
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 2bd7ac1..c44efa8 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -42,6 +42,23 @@ CuratorFramework client = CuratorFrameworkFactory.builder()
// all connection state listeners set for "client" will get circuit breaking behavior
{code}
+h2. Functional Watcher Builder and Helpers
+
+See: [[Watchers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java]] and
+[[WatcherHelpers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java]]
+
+Especially useful with the new Persistent/Recursive Watcher support, this builder and helpers allow creating Watchers from
+functional methods and utilities that chain together. E.g.
+
+{code}
+Watcher watcher = Watchers.filter(WatcherHelpers.filterNonSystemEvents())
+ .map(WatcherHelpers::mapToPathAndNode)
+ .process(pathAndNode -> {
+ ... etc ...
+ });
+... use the watcher in a Curator method ...
+{code}
+
h2. Locker
Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: