You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/14 13:37:27 UTC
[curator] 01/03: Support ZK 3.6 and add support for upcoming
Persistent Recursive Watch APIs.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch persistent-watcher-functional
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 8b6e92d1018b21019979c79bf7359bbd6b2310c6
Author: randgalt <ra...@apache.org>
AuthorDate: Wed Oct 2 20:09:32 2019 -0500
Support ZK 3.6 and add support for upcoming Persistent Recursive Watch APIs.
---
.../apache/curator/framework/CuratorFramework.java | 6 +
.../framework/api/AddPersistentWatchBuilder.java | 30 ++++
.../framework/api/AddPersistentWatchBuilder2.java | 25 +++
.../framework/api/AddPersistentWatchable.java | 40 +++++
.../imps/AddPersistentWatchBuilderImpl.java | 169 +++++++++++++++++++++
.../framework/imps/CuratorFrameworkImpl.java | 7 +
.../x/async/api/AsyncPersistentWatchBuilder.java | 33 ++++
.../details/AsyncPersistentWatchBuilderImpl.java | 75 +++++++++
8 files changed, 385 insertions(+)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index a803e63..89054a2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -199,6 +199,12 @@ public interface CuratorFramework extends Closeable
public WatchesBuilder watches();
/**
+ * Start an add watch builder
+ *
+ * @return builder object
+ */
+ public AddWatchBuilder addWatch();
+ /**
* Returns the listenable interface for the Connect State
*
* @return listenable
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
new file mode 100644
index 0000000..a167174
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2
+{
+ /**
+ * ZooKeeper persistent watches can optionally be recursive. See
+ * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+ *
+ * @return this
+ */
+ AddPersistentWatchBuilder2 recursive();
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
new file mode 100644
index 0000000..15cea4f
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AddPersistentWatchBuilder2 extends
+ Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
+ AddPersistentWatchable<Pathable<Void>>
+{
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
new file mode 100644
index 0000000..faa8906
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher;
+
+public interface AddPersistentWatchable<T>
+{
+ /**
+ * Set a watcher for the operation
+ *
+ * @param watcher the watcher
+ * @return this
+ */
+ T usingWatcher(Watcher watcher);
+
+ /**
+ * Set a watcher for the operation
+ *
+ * @param watcher the watcher
+ * @return this
+ */
+ T usingWatcher(CuratorWatcher watcher);
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..acb70c8
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.zookeeper.Watcher;
+import java.util.concurrent.Executor;
+
+public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+{
+ private final CuratorFrameworkImpl client;
+ private Watching watching = null;
+ private Backgrounding backgrounding = new Backgrounding();
+ private boolean recursive = false;
+
+ AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
+ {
+ this.client = client;
+ this.watching = watching;
+ this.backgrounding = backgrounding;
+ this.recursive = recursive;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground()
+ {
+ backgrounding = new Backgrounding();
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchBuilder2 recursive()
+ {
+ recursive = true;
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> usingWatcher(Watcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> usingWatcher(CuratorWatcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(callback, executor);
+ return this;
+ }
+
+ @Override
+ public AddPersistentWatchable<Pathable<Void>> inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public Void forPath(String path) throws Exception
+ {
+ if ( backgrounding.inBackground() )
+ {
+ client.processBackgroundOperation(new OperationAndData<>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null);
+ }
+ else
+ {
+ pathInForeground(path);
+ }
+ return null;
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<String> data) throws Exception
+ {
+ String path = data.getData();
+ String fixedPath = client.fixForNamespace(path);
+ try
+ {
+ final OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Background");
+ client.getZooKeeper().addPersistentWatch
+ (
+ fixedPath,
+ watching.getWatcher(path),
+ recursive, (rc, path1, ctx) -> {
+ trace.setReturnCode(rc).setWithWatcher(true).setPath(path1).commit();
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.ADD_PERSISTENT_WATCH, rc, path1, null, ctx, null, null, null, null, null, null);
+ client.processBackgroundOperation(data, event);
+ },
+ backgrounding.getContext()
+ );
+ }
+ catch ( Throwable e )
+ {
+ backgrounding.checkError(e, watching);
+ }
+ }
+
+ private void pathInForeground(final String path) throws Exception
+ {
+ final String fixedPath = client.fixForNamespace(path);
+ OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("AddPersistentWatchBuilderImpl-Foreground");
+ RetryLoop.callWithRetry
+ (
+ client.getZookeeperClient(), () -> {
+ client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive);
+ return null;
+ });
+ trace.setPath(fixedPath).setWithWatcher(true).commit();
+ }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c8ebbb6..6cd3d63 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -571,6 +571,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
return new WatchesBuilderImpl(this);
}
+ @Override
+ public AddWatchBuilder addWatch()
+ {
+ Preconditions.checkState(!isZk34CompatibilityMode(), "Persistent watches APIs are not support when running in ZooKeeper 3.4 compatibility mode");
+ return new AddWatchBuilderImpl(this);
+ }
+
protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
{
BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
new file mode 100644
index 0000000..0f29233
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.curator.x.async.api;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.x.async.AsyncStage;
+
+ public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>
+ {
+ /**
+ * ZooKeeper persistent watches can optionally be recursive. See
+ * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+ *
+ * @return this
+ */
+ AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive();
+ }
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..14f3e30
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.curator.x.async.details;
+
+ import org.apache.curator.framework.api.AddPersistentWatchable;
+ import org.apache.curator.framework.api.CuratorWatcher;
+ import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
+ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+ import org.apache.curator.framework.imps.Watching;
+ import org.apache.curator.x.async.AsyncStage;
+ import org.apache.curator.x.async.api.AsyncPathable;
+ import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
+ import org.apache.zookeeper.Watcher;
+
+ import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+ import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+ class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+ {
+ private final CuratorFrameworkImpl client;
+ private final Filters filters;
+ private Watching watching = null;
+ private boolean recursive = false;
+
+ AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+ {
+ this.client = client;
+ this.filters = filters;
+ }
+
+ @Override
+ public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
+ {
+ recursive = true;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public AsyncStage<Void> forPath(String path)
+ {
+ BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+ AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
+ return safeCall(common.internalCallback, () -> builder.forPath(path));
+ }
+ }
\ No newline at end of file