You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/03 04:24:40 UTC

[curator] branch persistent-watcher-recipe created (now 3068468)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch persistent-watcher-recipe
in repository https://gitbox.apache.org/repos/asf/curator.git.


      at 3068468  Added support for a PersistentWatcher recipe based on new persistent watch APIs

This branch includes the following new commits:

     new 3068468  Added support for a PersistentWatcher recipe based on new persistent watch APIs

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[curator] 01/01: Added support for a PersistentWatcher recipe based on new persistent watch APIs

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch persistent-watcher-recipe
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 306846871cdf7e7c42aeb96bd54e14ad98348c6d
Author: randgalt <ra...@apache.org>
AuthorDate: Wed Oct 2 23:24:14 2019 -0500

    Added support for a PersistentWatcher recipe based on new persistent watch APIs
---
 .../framework/recipes/watch/PersistentWatcher.java | 135 +++++++++++++++++++++
 .../recipes/watch/TestPersistentWatcher.java       |  86 +++++++++++++
 2 files changed, 221 insertions(+)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..3fc05c9
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,135 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+  * A managed persistent watcher. The watch will be managed such that it stays set through
+  * connection lapses, etc.
+  */
+ public class PersistentWatcher implements Closeable
+ {
+     private final Logger log = LoggerFactory.getLogger(getClass());
+     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+     private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+     private final ConnectionStateListener connectionStateListener = (client, newState) -> {
+         if ( newState.isConnected() )
+         {
+             reset();
+         }
+     };
+     private final Watcher watcher = event -> listeners.forEach(w -> w.process(event));
+     private final CuratorFramework client;
+     private final String basePath;
+     private final boolean recursive;
+
+     private enum State
+     {
+         LATENT,
+         STARTED,
+         CLOSED
+     }
+
+     /**
+      * @param client client
+      * @param basePath path to set the watch on
+      * @param recursive ZooKeeper persistent watches can optionally be recursive
+      */
+     public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
+     {
+         this.client = Objects.requireNonNull(client, "client cannot be null");
+         this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+         this.recursive = recursive;
+     }
+
+     /**
+      * Start watching
+      */
+     public void start()
+     {
+         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+         client.getConnectionStateListenable().addListener(connectionStateListener);
+         reset();
+     }
+
+     /**
+      * Remove the watcher
+      */
+     @Override
+     public void close()
+     {
+         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+         {
+             listeners.clear();
+             client.getConnectionStateListenable().removeListener(connectionStateListener);
+             try
+             {
+                 client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+             }
+             catch ( Exception e )
+             {
+                 ThreadUtils.checkInterrupted(e);
+                 log.debug(String.format("Could not remove watcher for path: %s", basePath), e);
+             }
+         }
+     }
+
+     /**
+      * Container for setting listeners
+      *
+      * @return listener container
+      */
+     public Listenable<Watcher> getListenable()
+     {
+         return listeners;
+     }
+
+     private void reset()
+     {
+         try
+         {
+             AddPersistentWatchBuilder2 builder = recursive ? client.addPersistentWatch().recursive() : client.addPersistentWatch();
+             BackgroundCallback callback = (__, event) -> {
+                 if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) {
+                     client.runSafe(this::reset);
+                 }
+             };
+             builder.inBackground(callback).usingWatcher(watcher).forPath(basePath);
+         }
+         catch ( Exception e )
+         {
+             log.error("Could not reset persistent watch at path: " + basePath, e);
+         }
+     }
+ }
\ No newline at end of file
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
new file mode 100644
index 0000000..df18de5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -0,0 +1,86 @@
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPersistentWatcher extends BaseClassForTests
+{
+    private final Timing2 timing = new Timing2();
+
+    @Test
+    public void testConnectionLostRecursive() throws Exception
+    {
+        internalTest(true);
+    }
+
+    @Test
+    public void testConnectionLost() throws Exception
+    {
+        internalTest(false);
+    }
+
+    private void internalTest(boolean recursive) throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+        {
+            CountDownLatch lostLatch = new CountDownLatch(1);
+            CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            client.start();
+            client.getConnectionStateListenable().addListener((__, newState) -> {
+                if ( newState == ConnectionState.LOST ) {
+                    lostLatch.countDown();
+                } else if ( newState == ConnectionState.RECONNECTED ) {
+                    reconnectedLatch.countDown();
+                }
+            });
+
+            try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) )
+            {
+                persistentWatcher.start();
+
+                BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+                persistentWatcher.getListenable().addListener(events::add);
+
+                client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+                if ( recursive )
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+                }
+                else
+                {
+                    Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");   // child added
+                }
+
+                server.stop();
+                Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected);
+                Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+                server.restart();
+                Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+                timing.sleepABit();     // time to allow watcher to get reset
+                events.clear();
+
+                if ( recursive )
+                {
+                    client.setData().forPath("/top/main/a", "foo".getBytes());
+                    Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+                }
+                client.setData().forPath("/top/main", "bar".getBytes());
+                Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+            }
+        }
+    }
+}