You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/14 13:37:29 UTC

[curator] 03/03: Added some functional utilities that will aid using persistent/recursive watchers

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-functional
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 0b52b2c7f2743938a86865dc9898009992b2538e
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 14 15:01:01 2019 +0300

    Added some functional utilities that will aid using persistent/recursive watchers
---
 .../framework/recipes/watch/PersistentWatcher.java |  18 +++
 .../framework/recipes/watch/WatcherHelpers.java    |  79 +++++++++++++
 .../curator/framework/recipes/watch/Watchers.java  |  93 +++++++++++++++
 .../framework/recipes/watch/WatchersImpl.java      | 110 ++++++++++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  18 +++
 .../framework/recipes/watch/TestWatchers.java      | 125 +++++++++++++++++++++
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  18 +++
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 -------------
 src/site/confluence/utilities.confluence           |  17 +++
 9 files changed, 478 insertions(+), 75 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 9f0f497..201caf9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
new file mode 100644
index 0000000..4db368a
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.IntPredicate;
+import java.util.function.Predicate;
+
+public class WatcherHelpers
+{
+    private static final ZKPaths.PathAndNode NULL_PATH_AND_NODE = new ZKPaths.PathAndNode("/", "");
+
+    public static ZKPaths.PathAndNode mapToPathAndNode(WatchedEvent event)
+    {
+        if ( event.getType() == Watcher.Event.EventType.None )
+        {
+            return NULL_PATH_AND_NODE;
+        }
+        return ZKPaths.getPathAndNode(event.getPath());
+    }
+
+    public static List<String> mapToParts(WatchedEvent event)
+    {
+        if ( event.getType() == Watcher.Event.EventType.None )
+        {
+            return Collections.emptyList();
+        }
+        return ZKPaths.split(event.getPath());
+    }
+
+    public static Predicate<WatchedEvent> filterNode(Predicate<String> nodeFilter)
+    {
+        return event -> nodeFilter.test(mapToPathAndNode(event).getNode());
+    }
+
+    public static Predicate<WatchedEvent> filterPath(Predicate<String> pathFilter)
+    {
+        return event -> pathFilter.test(mapToPathAndNode(event).getPath());
+    }
+
+    public static Predicate<WatchedEvent> filterDepth(IntPredicate depthFilter)
+    {
+        return event -> depthFilter.test(mapToParts(event).size());
+    }
+
+    public static Predicate<WatchedEvent> filterSystemEvents()
+    {
+        return event -> event.getType() == Watcher.Event.EventType.None;
+    }
+
+    public static Predicate<WatchedEvent> filterNonSystemEvents()
+    {
+        return event -> event.getType() != Watcher.Event.EventType.None;
+    }
+
+    private WatcherHelpers()
+    {
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
new file mode 100644
index 0000000..600a7ec
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * Functional builders to wrap a ZooKeeper Watcher. Call the filter/map/etc. methods
+ * as needed and complete by calling {@link Next#process(java.util.function.Consumer)} which
+ * returns a Watcher that can be passed to any Curator/ZooKeeper method that accepts Watchers.
+ */
+public interface Watchers
+{
+    /**
+     * Filter the chained watched event. If the predicate returns false, the chain ends and no further processing
+     * occurs.
+     *
+     * @param filter predicate
+     * @return chain
+     */
+    static Next<WatchedEvent> filter(Predicate<WatchedEvent> filter)
+    {
+        return new WatchersImpl<>(filter);
+    }
+
+    /**
+     * Map the chained watched event to a new object
+     *
+     * @param mapper mapper
+     * @return chain
+     */
+    static <R> Next<R> map(Function<WatchedEvent, ? super R> mapper)
+    {
+        return new WatchersImpl<>(mapper);
+    }
+
+    interface Next<T>
+    {
+        /**
+         * Filter the chained value. If the predicate returns false, the chain ends and no further processing
+         * occurs.
+         *
+         * @param filter predicate
+         * @return chain
+         */
+        Next<T> filter(Predicate<? super T> filter);
+
+        /**
+         * Peek at the chained value
+         *
+         * @param consumer consumer
+         * @return chain
+         */
+        Next<T> peek(Consumer<? super T> consumer);
+
+        /**
+         * Map the chained value to a new object
+         *
+         * @param mapper mapper
+         * @return chain
+         */
+        <R> Next<R> map(Function<? super T, ? extends R> mapper);
+
+        /**
+         * Complete the chain and return a new Watcher that consists of all the previous steps in the
+         * chain. When the Watcher is called by ZooKeeper, all steps of the chain are called in order.
+         *
+         * @param handler consumer for the final value of the chain
+         * @return new Watcher
+         */
+        Watcher process(Consumer<T> handler);
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
new file mode 100644
index 0000000..ed0e02f
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+public class WatchersImpl<T> implements Watchers.Next<T>
+{
+    private final Step step;
+
+    private static class Step
+    {
+        private final Predicate filter;
+        private final Function mapper;
+
+        private final Step head;
+        private Step next;
+
+        Step(Predicate filter, Function mapper)
+        {
+            this(filter, mapper, null);
+        }
+
+        Step(Predicate filter, Function mapper, Step head)
+        {
+            this.filter = (filter != null) ? filter : (__ -> true);
+            this.mapper = (mapper != null) ? mapper : Function.identity();
+            this.head = (head != null) ? head : this;
+        }
+    }
+
+    WatchersImpl(Predicate<WatchedEvent> filter)
+    {
+        step = new Step(filter, null);
+    }
+
+    <R> WatchersImpl(Function<WatchedEvent, ? super R> mapper)
+    {
+        step = new Step(null, mapper);
+    }
+
+    private WatchersImpl(Step step)
+    {
+        this.step = step;
+    }
+
+    @Override
+    public Watchers.Next<T> filter(Predicate<? super T> filter)
+    {
+        this.step.next = new Step(filter, null, step.head);
+        return new WatchersImpl<>(this.step.next);
+    }
+
+    @Override
+    public Watchers.Next<T> peek(Consumer<? super T> consumer)
+    {
+        return filter(value -> {
+            consumer.accept(value);
+            return true;
+        });
+    }
+
+    @Override
+    public <R> Watchers.Next<R> map(Function<? super T, ? extends R> mapper)
+    {
+        this.step.next = new Step(null, mapper, step.head);
+        return new WatchersImpl<>(this.step.next);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")  // all public APIs are correctly typed so this is safe
+    public Watcher process(Consumer<T> handler)
+    {
+        return event -> {
+            Object value = event;
+            Step currentStep = step.head;
+            while ( currentStep != null )
+            {
+                if ( !currentStep.filter.test(value) )
+                {
+                    return; // exit lambda
+                }
+                value = currentStep.mapper.apply(value);
+                currentStep = currentStep.next;
+            }
+
+            ((Consumer)handler).accept(value);
+        };
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..ad9703b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.CuratorFramework;
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
new file mode 100644
index 0000000..45185b2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.watch.WatcherHelpers.*;
+import static org.apache.curator.framework.recipes.watch.Watchers.filter;
+import static org.apache.curator.framework.recipes.watch.Watchers.map;
+
+public class TestWatchers extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testFilterFirst() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            Semaphore latch = new Semaphore(0);
+            Watcher watcher = filter(filterNonSystemEvents())
+                .filter(event -> event.getType() == Watcher.Event.EventType.NodeCreated)
+                .map(WatcherHelpers::mapToPathAndNode)
+                .filter(z -> z.getPath().equals("/one/two"))
+                .process(__ -> latch.release());
+            client.checkExists().usingWatcher(watcher).forPath("/one/two/three");
+
+            client.create().forPath("/one");
+            timing.sleepABit();
+            Assert.assertEquals(latch.availablePermits(), 0);
+
+            client.create().forPath("/one/two");
+            timing.sleepABit();
+            Assert.assertEquals(latch.availablePermits(), 0);
+
+            client.create().forPath("/one/two/three");
+            Assert.assertTrue(timing.acquireSemaphore(latch, 1));
+        }
+    }
+
+    @Test
+    public void testMapFirst() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            BlockingQueue<String> result = new LinkedBlockingQueue<>();
+            BlockingQueue<String> peekedResult = new LinkedBlockingQueue<>();
+            Watcher watcher = map(event -> "The path is: " + event.getPath())
+                .peek(peekedResult::add)
+                .process(result::add);
+            client.checkExists().usingWatcher(watcher).forPath("/one");
+
+            client.create().forPath("/one");
+            Assert.assertEquals(timing.takeFromQueue(peekedResult), "The path is: /one");
+            Assert.assertEquals(timing.takeFromQueue(result), "The path is: /one");
+        }
+    }
+
+    @Test
+    public void testSystemEvents() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            CountDownLatch latch = new CountDownLatch(1);
+            Watcher watcher = filter(filterSystemEvents())
+                .process(__ -> latch.countDown());
+            client.checkExists().usingWatcher(watcher).forPath("/one");
+            server.stop();
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testHelpers() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            BlockingQueue<String> result = new LinkedBlockingQueue<>();
+            Watcher watcher = filter(filterPath(node -> node.startsWith("/a")))
+                .filter(filterNode(node -> node.equals("c")))
+                .filter(filterDepth(depth -> depth > 1))
+                .map(WatcherHelpers::mapToParts)
+                .map(parts -> String.join("|", parts))
+                .process(result::add);
+            client.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            client.create().creatingParentsIfNeeded().forPath("/a/b/c");
+            Assert.assertEquals(timing.takeFromQueue(result), "a|b|c");
+        }
+    }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *   http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied.  See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
- {
-     private final CuratorFrameworkImpl client;
-     private final Filters filters;
-     private Watching watching = null;
-     private boolean recursive = false;
-
-     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
-     {
-         this.client = client;
-         this.filters = filters;
-     }
-
-     @Override
-     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
-     {
-         recursive = true;
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncStage<Void> forPath(String path)
-     {
-         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
-         AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
-         return safeCall(common.internalCallback, () -> builder.forPath(path));
-     }
- }
\ No newline at end of file
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 2bd7ac1..c44efa8 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -42,6 +42,23 @@ CuratorFramework client = CuratorFrameworkFactory.builder()
 // all connection state listeners set for "client" will get circuit breaking behavior
 {code}
 
+h2. Functional Watcher Builder and Helpers
+
+See: [[Watchers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java]] and
+[[WatcherHelpers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java]]
+
+Especially useful with the new Persistent/Recursive Watcher support, this builder and helpers allow creating Watchers from
+functional methods and utilities that chain together. E.g.
+
+{code}
+Watcher watcher = Watchers.filter(WatcherHelpers.filterNonSystemEvents())
+                          .map(WatcherHelpers::mapToPathAndNode)
+                          .process(pathAndNode -> {
+                              ... etc ...
+                          });
+... use the watcher in a Curator method ...
+{code}
+
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: