You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/10/04 14:02:08 UTC

[8/8] curator git commit: 1. Updates from latest ZK changes. 2. Added async version for creating persistent watch

1. Updates from latest ZK changes. 2. Added async version for creating persistent watch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d7bf1a24
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d7bf1a24
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d7bf1a24

Branch: refs/heads/persistent-watch
Commit: d7bf1a2461ecfa0bf708b3890dc4b3a019cacdb0
Parents: a27f876
Author: randgalt <ra...@apache.org>
Authored: Wed Oct 4 16:01:51 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Oct 4 16:01:51 2017 +0200

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java       |  5 +++++
 .../imps/AddPersistentWatchBuilderImpl.java       | 10 +++++++++-
 .../apache/curator/framework/imps/Watching.java   | 10 +++++-----
 .../framework/recipes/cache/TreeCacheBridge.java  | 18 ++++++++++++++++++
 .../recipes/cache/TreeCacheBridgeImpl.java        | 18 ++++++++++++++++++
 .../framework/recipes/watch/CacheSelectors.java   |  6 ++----
 .../framework/recipes/watch/CachedNode.java       | 18 ++++++++++++++++++
 .../x/async/api/AsyncCuratorFrameworkDsl.java     |  7 +++++++
 .../async/details/AsyncCuratorFrameworkImpl.java  |  6 ++++++
 .../curator/x/async/TestBasicOperations.java      | 14 ++++++++++++++
 10 files changed, 102 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index ce31d08..f075daa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -193,6 +193,11 @@ public interface CuratorFramework extends Closeable
      */
     public SyncBuilder sync();
 
+    /**
+     * Start a persistent watch builder
+     *
+     * @return builder object
+     */
     public AddPersistentWatchBuilder addPersistentWatch();
 
     /**

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
index 56f8f79..4f51f39 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 
-class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
 {
     private final CuratorFrameworkImpl client;
     private Watching watching = null;
@@ -45,6 +45,14 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
         this.client = client;
     }
 
+    public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
+    {
+        this.client = client;
+        this.watching = watching;
+        this.backgrounding = backgrounding;
+        this.recursive = recursive;
+    }
+
     @Override
     public AddPersistentWatchable<Pathable<Void>> inBackground()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 
-class Watching
+public class Watching
 {
     private final Watcher watcher;
     private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
     private final CuratorFrameworkImpl client;
     private NamespaceWatcher namespaceWatcher;
 
-    Watching(CuratorFrameworkImpl client, boolean watched)
+    public Watching(CuratorFrameworkImpl client, boolean watched)
     {
         this.client = client;
         this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
         this.watched = watched;
     }
 
-    Watching(CuratorFrameworkImpl client, Watcher watcher)
+    public Watching(CuratorFrameworkImpl client, Watcher watcher)
     {
         this.client = client;
         this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+    public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
     {
         this.client = client;
         this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
         this.watched = false;
     }
 
-    Watching(CuratorFrameworkImpl client)
+    public Watching(CuratorFrameworkImpl client)
     {
         this.client = client;
         watcher = null;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
index 8b6f37a..4a0eed9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.listen.Listenable;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
index 0198aa4..35fcac4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.util.concurrent.MoreExecutors;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index 8814e57..ed6c6fa 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -21,9 +21,8 @@ package org.apache.curator.framework.recipes.watch;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.server.PathIterator;
+import org.apache.zookeeper.server.PathParentIterator;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -315,10 +314,9 @@ public class CacheSelectors
 
             private CacheSelector getSelector(String fullPath)
             {
-                String parent = ZKPaths.getPathAndNode(fullPath).getPath();
                 for ( CompositeEntry entry : entries )
                 {
-                    PathIterator pathIterator = new PathIterator(fullPath);
+                    PathParentIterator pathIterator = PathParentIterator.forAll(fullPath);
                     while ( pathIterator.hasNext() )
                     {
                         if ( pathIterator.next().equals(entry.path) )

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
index b07993f..18131cf 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 import org.apache.zookeeper.data.Stat;

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..c1748d0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
     AsyncReconfigBuilder reconfig();
 
     /**
+     * Start a persistent watch builder
+     *
+     * @return builder object
+     */
+    AsyncPersistentWatchBuilder addPersistentWatch();
+
+    /**
      * Start a transaction builder
      *
      * @return builder object

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..afa1de0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
     }
 
     @Override
+    public AsyncPersistentWatchBuilderImpl addPersistentWatch()
+    {
+        return new AsyncPersistentWatchBuilderImpl(client, filters);
+    }
+
+    @Override
     public AsyncMultiTransaction transaction()
     {
         return operations -> {

http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index f814146..aed1385 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -32,8 +32,10 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static java.util.EnumSet.of;
 import static org.apache.curator.x.async.api.CreateOption.compress;
@@ -199,4 +201,16 @@ public class TestBasicOperations extends CompletableBaseClassForTests
         complete(client.getData().storingStatIn(stat).forPath("/test"));
         Assert.assertEquals(stat.getDataLength(), "hey".length());
     }
+
+    @Test
+    public void testPersistentRecursiveWatch() throws Exception
+    {
+        BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>();
+        Watcher watcher = event -> events.add(event.getType());
+        complete(client.addPersistentWatch().recursive().usingWatcher(watcher).forPath("/a/b"));
+        client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/c");
+        client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/d");
+        Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated);
+        Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated);
+    }
 }