You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/14 13:34:20 UTC

[curator] 01/05: Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 7f1f644bfe5a9de95c921fc1f9a714f3f75d7027
Author: randgalt <ra...@apache.org>
AuthorDate: Wed Oct 2 20:09:32 2019 -0500

    Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
---
 .../framework/api/AddPersistentWatchBuilder.java   |  30 ++++
 .../framework/api/AddPersistentWatchBuilder2.java  |  25 +++
 .../framework/api/AddPersistentWatchable.java      |  40 +++++
 .../imps/AddPersistentWatchBuilderImpl.java        | 169 +++++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java       |   7 +
 .../x/async/api/AsyncPersistentWatchBuilder.java   |  33 ++++
 .../details/AsyncPersistentWatchBuilderImpl.java   |  75 +++++++++
 7 files changed, 379 insertions(+)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
new file mode 100644
index 0000000..a167174
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2
+{
+    /**
+     * ZooKeeper persistent watches can optionally be recursive. See
+     * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+     *
+     * @return this
+     */
+    AddPersistentWatchBuilder2 recursive();
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
new file mode 100644
index 0000000..15cea4f
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AddPersistentWatchBuilder2 extends
+    Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
+    AddPersistentWatchable<Pathable<Void>>
+{
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
new file mode 100644
index 0000000..faa8906
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher;
+
+public interface AddPersistentWatchable<T>
+{
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(Watcher watcher);
+
+    /**
+     * Set a watcher for the operation
+     *
+     * @param watcher the watcher
+     * @return this
+     */
+    T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..acb70c8
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+    private final CuratorFrameworkImpl client;
+    private Watching watching = null;
+    private Backgrounding backgrounding = new Backgrounding();
+    private boolean recursive = false;
+
+    AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.recursive = recursive;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground()
+    {
+        backgrounding = new Backgrounding();
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchBuilder2 recursive()
+    {
+        recursive = true;
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(Watcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+    {
+        watching = new Watching(client, watcher);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
+    {
+        backgrounding = new Backgrounding(context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+    {
+        backgrounding = new Backgrounding(callback);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+    {
+        backgrounding = new Backgrounding(callback, context);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+    {
+        backgrounding = new Backgrounding(callback, executor);
+        return this;
+    }
+
+    @Override
+    public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+    {
+        backgrounding = new Backgrounding(client, callback, context, executor);
+        return this;
+    }
+
+    @Override
+    public Void forPath(String path) throws Exception
+    {
+        if ( backgrounding.inBackground() )
+        {
+            client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+        }
+        else
+        {
+            pathInForeground(path);
+        }
+        return null;
+    }
+
+    @Override
+    public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+    {
+        String path = data.getData();
+        String fixedPath = client.fixForNamespace(path);
+        try
+        {
+            final OperationTrace   trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
+            client.getZooKeeper().addPersistentWatch
+                (
+                    fixedPath,
+                    watching.getWatcher(path),
+                    recursive, (rc, path1, ctx) -> {
+                        trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+                        CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+                        client.processBackgroundOperation(data, event);
+                    },
+                    backgrounding.getContext()
+                );
+        }
+        catch ( Throwable e )
+        {
+            backgrounding.checkError(e, watching);
+        }
+    }
+
+    private void pathInForeground(final String path) throws Exception
+    {
+        final String fixedPath = client.fixForNamespace(path);
+        OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
+        RetryLoop.callWithRetry
+        (
+            client.getZookeeperClient(), () -> {
+                client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive);
+                return null;
+            });
+        trace.setPath(fixedPath).setWithWatcher(true).commit();
+    }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c8ebbb6..6cd3d63 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -571,6 +571,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return new WatchesBuilderImpl(this);
     }
 
+    @Override
+    public AddWatchBuilder addWatch()
+    {
+        Preconditions.checkState(!isZk34CompatibilityMode(), "Persistent watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
+        return new AddWatchBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
new file mode 100644
index 0000000..0f29233
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -0,0 +1,33 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.api;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.x.async.AsyncStage;
+
+ public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>
+ {
+     /**
+      * ZooKeeper persistent watches can optionally be recursive. See
+      * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+      *
+      * @return this
+      */
+     AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive();
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..14f3e30
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
@@ -0,0 +1,75 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.x.async.details;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
+ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+ import org.apache.curator.framework.imps.Watching;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.curator.x.async.api.AsyncPathable;
+ import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
+ import org.apache.zookeeper.Watcher;
+
+ import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+ class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+ {
+     private final CuratorFrameworkImpl client;
+     private final Filters filters;
+     private Watching watching = null;
+     private boolean recursive = false;
+
+     AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+     {
+         this.client = client;
+         this.filters = filters;
+     }
+
+     @Override
+     public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
+     {
+         recursive = true;
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+     {
+         watching = new Watching(client, watcher);
+         return this;
+     }
+
+     @Override
+     public AsyncStage<Void> forPath(String path)
+     {
+         BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+         AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
+         return safeCall(common.internalCallback, () -> builder.forPath(path));
+     }
+ }
\ No newline at end of file