You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/12 14:40:10 UTC

[curator] branch CURATOR-549-zk36-persistent-watchers updated (4eab363 -> 0892328)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watchers
in repository https://gitbox.apache.org/repos/asf/curator.git.


    omit 4eab363  CURATOR-549
    omit 05b23a6  CURATOR-549
     add 3172ccb  CURATOR-549
     new 7994c59  CURATOR-549
     new 0892328  CURATOR-549

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4eab363)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watchers (0892328)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/curator/framework/imps/CuratorMultiTransactionRecord.java    | 2 +-
 .../java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java     | 2 +-
 .../src/test/java/org/apache/curator/framework/imps/TestFramework.java  | 2 +-
 .../test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java | 2 +-
 .../java/org/apache/curator/framework/imps/TestReconfiguration.java     | 2 +-
 .../test/java/org/apache/curator/framework/imps/TestWithCluster.java    | 2 +-
 .../org/apache/curator/framework/state/TestConnectionStateManager.java  | 2 +-
 .../java/org/apache/curator/framework/recipes/cache/TestNodeCache.java  | 2 +-
 .../apache/curator/framework/recipes/cache/TestPathChildrenCache.java   | 2 +-
 .../curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java | 2 +-
 .../java/org/apache/curator/framework/recipes/cache/TestTreeCache.java  | 2 +-
 .../org/apache/curator/framework/recipes/leader/TestLeaderLatch.java    | 2 +-
 .../framework/recipes/locks/TestInterProcessSemaphoreCluster.java       | 2 +-
 curator-test-zk35/pom.xml                                               | 2 +-
 .../java/org/apache/curator/test/compatibility/CuratorTestBase.java     | 2 +-
 15 files changed, 15 insertions(+), 15 deletions(-)


[curator] 02/02: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watchers
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 0892328a78b7783e86af7e35ebbb5db25015c5bc
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    CURATOR-549
    
    Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Adds Curator Framework DSL calls to ZooKeeper's addWatch() method. Subsequent PRs will add recipes. Both the older Framework and the async Framework now have methods to add persistent watchers.
---
 .../apache/curator/framework/CuratorFramework.java |   8 +
 .../curator/framework/api/AddWatchBuilder.java     |  32 ++++
 .../curator/framework/api/AddWatchBuilder2.java    |  27 +++
 .../apache/curator/framework/api/AddWatchable.java |  41 +++++
 .../curator/framework/api/WatchesBuilder.java      |  33 ++++
 .../framework/imps/AddWatchBuilderImpl.java        | 197 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |   7 +
 .../curator/framework/imps/WatchesBuilderImpl.java |  45 +++++
 .../curator/framework/imps/TestWatchesBuilder.java |  53 ++++++
 .../curator/framework/TestCompatibility.java       |  49 +++++
 .../x/async/api/AsyncCuratorFrameworkDsl.java      |   7 +
 .../curator/x/async/api/AsyncWatchBuilder.java     |  37 ++++
 .../curator/x/async/api/AsyncWatchBuilder2.java    |  29 +++
 .../x/async/details/AsyncCuratorFrameworkImpl.java |   9 +
 .../x/async/details/AsyncWatchBuilderImpl.java     |  79 +++++++++
 .../curator/framework/imps/TestAddWatch.java       |  87 +++++++++
 16 files changed, 740 insertions(+)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index ccafa0b..d279373 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -196,10 +196,18 @@ public interface CuratorFramework extends Closeable
      * Start a remove watches builder.
      *
      * @return builder object
+     * @deprecated use {@link #watchers()} in ZooKeeper 3.6+
      */
     public RemoveWatchesBuilder watches();
 
     /**
+     * Start a watches builder.
+     *
+     * @return builder object
+     */
+    public WatchesBuilder watchers();
+
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
new file mode 100644
index 0000000..ad6d434
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.AddWatchMode;
+
+public interface AddWatchBuilder extends AddWatchBuilder2
+{
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode to use
+     * @return this
+     */
+    AddWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
new file mode 100644
index 0000000..9114c00
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchBuilder2.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+public interface AddWatchBuilder2 extends
+    Backgroundable<AddWatchable<Pathable<Void>>>,
+    AddWatchable<Pathable<Void>>,
+    Pathable<Void>
+{
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
new file mode 100644
index 0000000..1f0646c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddWatchable.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher;
+
+public interface AddWatchable<T>
+{
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
new file mode 100644
index 0000000..3cd5528
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/WatchesBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow watches to be removed 
+ */
+public interface WatchesBuilder extends RemoveWatchesBuilder
+{
+    /**
+     * Start an add watch operation
+     *
+     * @return builder
+     */
+    AddWatchBuilder add();
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
new file mode 100644
index 0000000..9f119a3
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddWatchBuilderImpl.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.AddWatchBuilder2;
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddWatchBuilderImpl implements AddWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching;
+    private Backgrounding backgrounding = new Backgrounding();
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AddWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+        watching = new Watching(client, true);
+    }
+
+    public AddWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, AddWatchMode mode)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.mode = mode;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Background");
+            if ( watching.isWatched() )
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+            else
+            {
+                client.getZooKeeper().addWatch
+                    (
+                        fixedPath,
+                        watching.getWatcher(path),
+                        mode,
+                        (rc, path1, ctx) -> {
+                            trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                            CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                            client.processBackgroundOperation(data, event);
+                        },
+                        backgrounding.getContext()
+                    );
+            }
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                if ( watching.isWatched() )
+                {
+                    client.getZooKeeper().addWatch(fixedPath, mode);
+                }
+                else
+                {
+                    client.getZooKeeper().addWatch(fixedPath, watching.getWatcher(path), mode);
+                }
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index cab378d..5afc5ce 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -572,6 +572,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new RemoveWatchesBuilderImpl(this);
     }
 
+    @Override
+    public WatchesBuilder watchers()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used. Use watches() instead.");
+        return new WatchesBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
new file mode 100644
index 0000000..4a273c6
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatchesBuilderImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.api.AddWatchBuilder;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.WatchesBuilder;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+
+public class WatchesBuilderImpl extends RemoveWatchesBuilderImpl implements WatchesBuilder
+{
+    public WatchesBuilderImpl(CuratorFrameworkImpl client)
+    {
+        super(client);
+    }
+
+    public WatchesBuilderImpl(CuratorFrameworkImpl client, Watcher watcher, CuratorWatcher curatorWatcher, WatcherType watcherType, boolean guaranteed, boolean local, boolean quietly, Backgrounding backgrounding)
+    {
+        super(client, watcher, curatorWatcher, watcherType, guaranteed, local, quietly, backgrounding);
+    }
+
+    @Override
+    public AddWatchBuilder add()
+    {
+        return new AddWatchBuilderImpl(getClient());
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
index 1d1e9bc..26c41f1 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatchesBuilder.java
@@ -33,11 +33,14 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CountDownLatch;
@@ -611,6 +614,56 @@ public class TestWatchesBuilder extends CuratorTestBase
         }
     }
 
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test(groups = CuratorTestBase.zk36Group)
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            client.watchers().add().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
     private static class CountDownWatcher implements Watcher {
         private String path;
         private EventType eventType;
diff --git a/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
new file mode 100644
index 0000000..5112d41
--- /dev/null
+++ b/curator-test-zk35/src/test/java/org/apache/curator/framework/TestCompatibility.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework;
+
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.testng.annotations.Test;
+
+public class TestCompatibility extends CuratorTestBase
+{
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailable() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.watchers().add().forPath("/foo");
+        }
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testPersistentWatchesNotAvailableAsync()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().forPath("/foo");
+        }
+    }
+}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..e54b148 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -112,4 +112,11 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
      * @return builder object
      */
     AsyncRemoveWatchesBuilder removeWatches();
+
+    /**
+     * Start an add watch builder
+     *
+     * @return builder object
+     */
+    AsyncWatchBuilder addWatch();
 }
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
new file mode 100644
index 0000000..a5e86ec
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.api;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.zookeeper.AddWatchMode;
+
+public interface AsyncWatchBuilder extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
+{
+    /**
+     * The mode to use. By default, {@link org.apache.zookeeper.AddWatchMode#PERSISTENT_RECURSIVE} is used
+     *
+     * @param mode mode
+     * @return this
+     */
+    AsyncWatchBuilder2 withMode(AddWatchMode mode);
+}
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
new file mode 100644
index 0000000..a97cd4b
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncWatchBuilder2.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.api;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.x.async.AsyncStage;
+
+public interface AsyncWatchBuilder2 extends
+    AddWatchable<AsyncPathable<AsyncStage<Void>>>,
+    AsyncPathable<AsyncStage<Void>>
+{
+}
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 07c3398..a248c0e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.details;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -26,6 +27,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorMultiTransactionImpl;
 import org.apache.curator.framework.imps.GetACLBuilderImpl;
 import org.apache.curator.framework.imps.SyncBuilderImpl;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.AsyncStage;
 import org.apache.curator.x.async.WatchMode;
@@ -152,6 +154,13 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncWatchBuilder addWatch()
+    {
+        Preconditions.checkState(Compatibility.hasPersistentWatchers(), "addWatch() is not supported in the ZooKeeper library being used.");
+        return new AsyncWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public CuratorFramework unwrap()
     {
         return client;
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
new file mode 100644
index 0000000..a10540e
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncWatchBuilderImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncWatchBuilder;
+import org.apache.curator.x.async.api.AsyncWatchBuilder2;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncWatchBuilderImpl implements AsyncWatchBuilder, AsyncWatchBuilder2, AddWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+    private final CuratorFrameworkImpl client;
+    private final Filters filters;
+    private Watching watching;
+    private AddWatchMode mode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    AsyncWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+    {
+        this.client = client;
+        this.filters = filters;
+        watching = new Watching(client, true);
+    }
+
+    @Override
+    public AsyncWatchBuilder2 withMode(AddWatchMode mode)
+    {
+        this.mode = mode;
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AsyncStage<Void> forPath(String path)
+    {
+        BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+        AddWatchBuilderImpl builder = new AddWatchBuilderImpl(client, watching, common.backgrounding, mode);
+        return safeCall(common.internalCallback, () -> builder.forPath(path));
+    }
+}
\ No newline at end of file
diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
new file mode 100644
index 0000000..bdf006a
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestAddWatch.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAddWatch extends CuratorTestBase
+{
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            CountDownLatch latch = new CountDownLatch(5);
+            Watcher watcher = event -> latch.countDown();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).usingWatcher(watcher).forPath("/test").toCompletableFuture().get();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+
+    @Test
+    public void testPersistentRecursiveDefaultWatch() throws Exception
+    {
+        CountDownLatch latch = new CountDownLatch(6);   // 5 creates plus the initial sync
+        ZookeeperFactory zookeeperFactory = (connectString, sessionTimeout, watcher, canBeReadOnly) -> {
+            Watcher actualWatcher = event -> {
+                watcher.process(event);
+                latch.countDown();
+            };
+            return new ZooKeeper(connectString, sessionTimeout, actualWatcher);
+        };
+        try (CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).zookeeperFactory(zookeeperFactory).build() )
+        {
+            client.start();
+            client.blockUntilConnected();
+
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.addWatch().withMode(AddWatchMode.PERSISTENT_RECURSIVE).forPath("/test");
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/a");
+            client.create().forPath("/test/a/b");
+            client.create().forPath("/test/a/b/c");
+            client.create().forPath("/test/a/b/c/d");
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+    }
+}


[curator] 01/02: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watchers
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 7994c59de8e01cba7116adf7e5d40f1c6f187164
Author: randgalt <ra...@apache.org>
AuthorDate: Sat Nov 2 11:40:44 2019 -0500

    CURATOR-549
    
    Bring Curator up to ZooKeeper 3.5.6 in preparation for supporting persistent recursive watchers while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain compatibility with ZK 3.5.x. ZooKeeper 3.6.0 has some significant changes from previous versions. The reconfig APIs have moved into a new class, ZooKeeperAdmin. This class existed in 3.5.x but wasn't required. Now it is. A bunch of little things changed in the ZK server code  [...]
    
    There is a new module, curator-test-zk35. It forces ZooKeeper 3.5.6 and performs selected tests from the other modules to ensure compatibility. Tests annotated with TestNG groups zk35 and zk35Compatibility are tested. Group zk36 is excluded. Note: these tests will only run from Maven. I don't think IntelliJ/Eclipse support the Maven syntax I used.
    Support persistent watchers in ZK 3.6+ while maintaining background compatability with previous versions of ZK. Added a new module to make sure we maintain comaptibility with ZK 3.5.x
---
 .../org/apache/curator/framework/imps/CuratorFrameworkImpl.java  | 9 +++++++++
 .../org/apache/curator/framework/imps/ReconfigBuilderImpl.java   | 2 +-
 .../org/apache/curator/framework/imps/TestReconfiguration.java   | 2 +-
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 8fe2770..cab378d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -621,6 +621,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return client.getZooKeeper();
     }
 
+    Object getZooKeeperAdmin() throws Exception
+    {
+        if ( isZk34CompatibilityMode() )
+        {
+            Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode");
+        }
+        return client.getZooKeeper();
+    }
+
     CompressionProvider getCompressionProvider()
     {
         return compressionProvider;
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
index 0386e5e..171b479 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java
@@ -288,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation
                     @Override
                     public byte[] call() throws Exception
                     {
-                        return ((ZooKeeperAdmin)client.getZooKeeper()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
+                        return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat);
                     }
                 }
             );
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 8a5d3b6..258428c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -56,7 +56,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
-@Test(groups = {CuratorTestBase.zk35Group, CuratorTestBase.zk35TestCompatibilityGroup})
+@Test(groups = CuratorTestBase.zk35TestCompatibilityGroup)
 public class TestReconfiguration extends CuratorTestBase
 {
     private final Timing2 timing = new Timing2();