You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/14 12:01:12 UTC

[curator] branch persistent-watcher-functional created (now 797b132)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch persistent-watcher-functional
in repository https://gitbox.apache.org/repos/asf/curator.git.


      at 797b132  wip

This branch includes the following new commits:

     new 797b132  wip

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[curator] 01/01: wip

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-functional
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 797b13224ca4d1ecda616eb8314f6055a90b008d
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 14 15:01:01 2019 +0300

    wip
---
 .../framework/recipes/watch/PersistentWatcher.java |  18 +++
 .../framework/recipes/watch/WatcherHelpers.java    |  79 +++++++++++++
 .../curator/framework/recipes/watch/Watchers.java  |  93 +++++++++++++++
 .../framework/recipes/watch/WatchersImpl.java      | 110 ++++++++++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  18 +++
 .../framework/recipes/watch/TestWatchers.java      | 125 +++++++++++++++++++++
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  18 +++
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 -------------
 src/site/confluence/utilities.confluence           |  17 +++
 9 files changed, 478 insertions(+), 75 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 2c97490..253c2a6 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
new file mode 100644
index 0000000..4db368a
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.IntPredicate;
+import java.util.function.Predicate;
+
+public class WatcherHelpers
+{
+    private static final ZKPaths.PathAndNode NULL_PATH_AND_NODE = new ZKPaths.PathAndNode("/", "");
+
+    public static ZKPaths.PathAndNode mapToPathAndNode(WatchedEvent event)
+    {
+        if ( event.getType() == Watcher.Event.EventType.None )
+        {
+            return NULL_PATH_AND_NODE;
+        }
+        return ZKPaths.getPathAndNode(event.getPath());
+    }
+
+    public static List<String> mapToParts(WatchedEvent event)
+    {
+        if ( event.getType() == Watcher.Event.EventType.None )
+        {
+            return Collections.emptyList();
+        }
+        return ZKPaths.split(event.getPath());
+    }
+
+    public static Predicate<WatchedEvent> filterNode(Predicate<String> nodeFilter)
+    {
+        return event -> nodeFilter.test(mapToPathAndNode(event).getNode());
+    }
+
+    public static Predicate<WatchedEvent> filterPath(Predicate<String> pathFilter)
+    {
+        return event -> pathFilter.test(mapToPathAndNode(event).getPath());
+    }
+
+    public static Predicate<WatchedEvent> filterDepth(IntPredicate depthFilter)
+    {
+        return event -> depthFilter.test(mapToParts(event).size());
+    }
+
+    public static Predicate<WatchedEvent> filterSystemEvents()
+    {
+        return event -> event.getType() == Watcher.Event.EventType.None;
+    }
+
+    public static Predicate<WatchedEvent> filterNonSystemEvents()
+    {
+        return event -> event.getType() != Watcher.Event.EventType.None;
+    }
+
+    private WatcherHelpers()
+    {
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
new file mode 100644
index 0000000..600a7ec
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * Functional builders to wrap a ZooKeeper Watcher. Call the filter/map/etc. methods
+ * as needed and complete by calling {@link Next#process(java.util.function.Consumer)} which
+ * returns a Watcher that can be passed to any Curator/ZooKeeper method that accepts Watchers.
+ */
+public interface Watchers
+{
+    /**
+     * Filter the chained watched event. If the predicate returns false, the chain ends and no further processing
+     * occurs.
+     *
+     * @param filter predicate
+     * @return chain
+     */
+    static Next<WatchedEvent> filter(Predicate<WatchedEvent> filter)
+    {
+        return new WatchersImpl<>(filter);
+    }
+
+    /**
+     * Map the chained watched event to a new object
+     *
+     * @param mapper mapper
+     * @return chain
+     */
+    static <R> Next<R> map(Function<WatchedEvent, ? super R> mapper)
+    {
+        return new WatchersImpl<>(mapper);
+    }
+
+    interface Next<T>
+    {
+        /**
+         * Filter the chained value. If the predicate returns false, the chain ends and no further processing
+         * occurs.
+         *
+         * @param filter predicate
+         * @return chain
+         */
+        Next<T> filter(Predicate<? super T> filter);
+
+        /**
+         * Peek at the chained value
+         *
+         * @param consumer consumer
+         * @return chain
+         */
+        Next<T> peek(Consumer<? super T> consumer);
+
+        /**
+         * Map the chained value to a new object
+         *
+         * @param mapper mapper
+         * @return chain
+         */
+        <R> Next<R> map(Function<? super T, ? extends R> mapper);
+
+        /**
+         * Complete the chain and return a new Watcher that consists of all the previous steps in the
+         * chain. When the Watcher is called by ZooKeeper, all steps of the chain are called in order.
+         *
+         * @param handler consumer for the final value of the chain
+         * @return new Watcher
+         */
+        Watcher process(Consumer<T> handler);
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
new file mode 100644
index 0000000..ed0e02f
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatchersImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+public class WatchersImpl<T> implements Watchers.Next<T>
+{
+    private final Step step;
+
+    private static class Step
+    {
+        private final Predicate filter;
+        private final Function mapper;
+
+        private final Step head;
+        private Step next;
+
+        Step(Predicate filter, Function mapper)
+        {
+            this(filter, mapper, null);
+        }
+
+        Step(Predicate filter, Function mapper, Step head)
+        {
+            this.filter = (filter != null) ? filter : (__ -> true);
+            this.mapper = (mapper != null) ? mapper : Function.identity();
+            this.head = (head != null) ? head : this;
+        }
+    }
+
+    WatchersImpl(Predicate<WatchedEvent> filter)
+    {
+        step = new Step(filter, null);
+    }
+
+    <R> WatchersImpl(Function<WatchedEvent, ? super R> mapper)
+    {
+        step = new Step(null, mapper);
+    }
+
+    private WatchersImpl(Step step)
+    {
+        this.step = step;
+    }
+
+    @Override
+    public Watchers.Next<T> filter(Predicate<? super T> filter)
+    {
+        this.step.next = new Step(filter, null, step.head);
+        return new WatchersImpl<>(this.step.next);
+    }
+
+    @Override
+    public Watchers.Next<T> peek(Consumer<? super T> consumer)
+    {
+        return filter(value -> {
+            consumer.accept(value);
+            return true;
+        });
+    }
+
+    @Override
+    public <R> Watchers.Next<R> map(Function<? super T, ? extends R> mapper)
+    {
+        this.step.next = new Step(null, mapper, step.head);
+        return new WatchersImpl<>(this.step.next);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")  // all public APIs are correctly typed so this is safe
+    public Watcher process(Consumer<T> handler)
+    {
+        return event -> {
+            Object value = event;
+            Step currentStep = step.head;
+            while ( currentStep != null )
+            {
+                if ( !currentStep.filter.test(value) )
+                {
+                    return; // exit lambda
+                }
+                value = currentStep.mapper.apply(value);
+                currentStep = currentStep.next;
+            }
+
+            ((Consumer)handler).accept(value);
+        };
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..ad9703b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.CuratorFramework;
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
new file mode 100644
index 0000000..45185b2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestWatchers.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.watch.WatcherHelpers.*;
+import static org.apache.curator.framework.recipes.watch.Watchers.filter;
+import static org.apache.curator.framework.recipes.watch.Watchers.map;
+
+public class TestWatchers extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testFilterFirst() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            Semaphore latch = new Semaphore(0);
+            Watcher watcher = filter(filterNonSystemEvents())
+                .filter(event -> event.getType() == Watcher.Event.EventType.NodeCreated)
+                .map(WatcherHelpers::mapToPathAndNode)
+                .filter(z -> z.getPath().equals("/one/two"))
+                .process(__ -> latch.release());
+            client.checkExists().usingWatcher(watcher).forPath("/one/two/three");
+
+            client.create().forPath("/one");
+            timing.sleepABit();
+            Assert.assertEquals(latch.availablePermits(), 0);
+
+            client.create().forPath("/one/two");
+            timing.sleepABit();
+            Assert.assertEquals(latch.availablePermits(), 0);
+
+            client.create().forPath("/one/two/three");
+            Assert.assertTrue(timing.acquireSemaphore(latch, 1));
+        }
+    }
+
+    @Test
+    public void testMapFirst() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            BlockingQueue<String> result = new LinkedBlockingQueue<>();
+            BlockingQueue<String> peekedResult = new LinkedBlockingQueue<>();
+            Watcher watcher = map(event -> "The path is: " + event.getPath())
+                .peek(peekedResult::add)
+                .process(result::add);
+            client.checkExists().usingWatcher(watcher).forPath("/one");
+
+            client.create().forPath("/one");
+            Assert.assertEquals(timing.takeFromQueue(peekedResult), "The path is: /one");
+            Assert.assertEquals(timing.takeFromQueue(result), "The path is: /one");
+        }
+    }
+
+    @Test
+    public void testSystemEvents() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            CountDownLatch latch = new CountDownLatch(1);
+            Watcher watcher = filter(filterSystemEvents())
+                .process(__ -> latch.countDown());
+            client.checkExists().usingWatcher(watcher).forPath("/one");
+            server.stop();
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testHelpers() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            BlockingQueue<String> result = new LinkedBlockingQueue<>();
+            Watcher watcher = filter(filterPath(node -> node.startsWith("/a")))
+                .filter(filterNode(node -> node.equals("c")))
+                .filter(filterDepth(depth -> depth > 1))
+                .map(WatcherHelpers::mapToParts)
+                .map(parts -> String.join("|", parts))
+                .process(result::add);
+            client.checkExists().usingWatcher(watcher).forPath("/a/b/c");
+            client.create().creatingParentsIfNeeded().forPath("/a/b/c");
+            Assert.assertEquals(timing.takeFromQueue(result), "a|b|c");
+        }
+    }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
 /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *   http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied.  See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
- {
-     private final CuratorFrameworkImpl client;
-     private final Filters filters;
-     private Watching watching = null;
-     private boolean recursive = false;
-
-     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
-     {
-         this.client = client;
-         this.filters = filters;
-     }
-
-     @Override
-     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
-     {
-         recursive = true;
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
-     {
-         watching = new Watching(client, watcher);
-         return this;
-     }
-
-     @Override
-     public AsyncStage<Void> forPath(String path)
-     {
-         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
-         AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
-         return safeCall(common.internalCallback, () -> builder.forPath(path));
-     }
- }
\ No newline at end of file
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 2bd7ac1..c44efa8 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -42,6 +42,23 @@ CuratorFramework client = CuratorFrameworkFactory.builder()
 // all connection state listeners set for "client" will get circuit breaking behavior
 {code}
 
+h2. Functional Watcher Builder and Helpers
+
+See: [[Watchers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Watchers.java]] and
+[[WatcherHelpers|curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/WatcherHelpers.java]]
+
+Especially useful with the new Persistent/Recursive Watcher support, this builder and helpers allow creating Watchers from
+functional methods and utilities that chain together. E.g.
+
+{code}
+Watcher watcher = Watchers.filter(WatcherHelpers.filterNonSystemEvents())
+                          .map(WatcherHelpers::mapToPathAndNode)
+                          .process(pathAndNode -> {
+                              ... etc ...
+                          });
+... use the watcher in a Curator method ...
+{code}
+
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: