You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/28 21:28:50 UTC

[bookkeeper] branch master updated: BOOKKEEPER-1086: ZkUnderreplicationManager cache watcher

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c2859  BOOKKEEPER-1086: ZkUnderreplicationManager cache watcher
32c2859 is described below

commit 32c2859f0bec9ab74965d954bce038480e4dc2ba
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Wed Jun 28 14:28:38 2017 -0700

    BOOKKEEPER-1086: ZkUnderreplicationManager cache watcher
    
    Previously, getLedgerToReplicate left watches each time it traversed the
    tree until it found a suitable replication target.  Since we don't have
    a way of canceling watches, these watches tended to get abandoned,
    particularly on interior nodes, which aren't changed much.  Thus,
    over time, some nodes would build up a very large number of watch.
    
    Instead, introduce a caching mechanism to remember outstanding watches
    and avoid ever creating two watches on the same node.
    
    Author: Samuel Just <sjustsalesforce.com>
    
    Author: Samuel Just <sj...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
    
    This closes #193 from athanatos/forupstream/BOOKKEEPER-1098
---
 .../meta/ZkLedgerUnderreplicationManager.java      |  67 +++--
 .../org/apache/bookkeeper/util/SubTreeCache.java   | 166 +++++++++++
 .../apache/bookkeeper/util/SubTreeCacheTest.java   | 321 +++++++++++++++++++++
 3 files changed, 523 insertions(+), 31 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index b695336..e986317 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.replication.ReplicationEnableCb;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.SubTreeCache;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -102,6 +103,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     private final String layoutZNode;
     private final AbstractConfiguration conf;
     private final ZooKeeper zkc;
+    private final SubTreeCache subTreeCache;
 
     public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
@@ -114,6 +116,12 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
 
         idExtractionPattern = Pattern.compile("urL(\\d+)$");
         this.zkc = zkc;
+        this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
+            @Override
+            public List<String> getChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {
+                return zkc.getChildren(path, watcher);
+            }
+        });
 
         checkLayout();
     }
@@ -384,12 +392,12 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         };
     }
 
-    private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w)
+    private long getLedgerToRereplicateFromHierarchy(String parent, long depth)
             throws KeeperException, InterruptedException {
         if (depth == 4) {
             List<String> children;
             try {
-                children = zkc.getChildren(parent, w);
+                children = subTreeCache.getChildren(parent);
             } catch (KeeperException.NoNodeException nne) {
                 // can occur if another underreplicated ledger's
                 // hierarchy is being cleaned up
@@ -401,8 +409,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             while (children.size() > 0) {
                 String tryChild = children.get(0);
                 try {
-                    String lockPath = urLockPath + "/" + tryChild;
-                    if (zkc.exists(lockPath, w) != null) {
+                    List<String> locks = subTreeCache.getChildren(urLockPath);
+                    if (locks.contains(tryChild)) {
                         children.remove(tryChild);
                         continue;
                     }
@@ -416,6 +424,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                         continue;
                     }
 
+                    String lockPath = urLockPath + "/" + tryChild;
                     long ledgerId = getLedgerId(tryChild);
                     zkc.create(lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
                     heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
@@ -431,7 +440,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
 
         List<String> children;
         try {
-            children = zkc.getChildren(parent, w);
+            children = subTreeCache.getChildren(parent);
         } catch (KeeperException.NoNodeException nne) {
             // can occur if another underreplicated ledger's
             // hierarchy is being cleaned up
@@ -443,7 +452,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         while (children.size() > 0) {
             String tryChild = children.get(0);
             String tryPath = parent + "/" + tryChild;
-            long ledger = getLedgerToRereplicateFromHierarchy(tryPath, depth + 1, w);
+            long ledger = getLedgerToRereplicateFromHierarchy(tryPath, depth + 1);
             if (ledger != -1) {
                 return ledger;
             }
@@ -459,11 +468,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             LOG.debug("pollLedgerToRereplicate()");
         }
         try {
-            Watcher w = new Watcher() {
-                    public void process(WatchedEvent e) { // do nothing
-                    }
-                };
-            return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0, w);
+            return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
         } catch (KeeperException ke) {
             throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
@@ -477,33 +482,33 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         if (LOG.isDebugEnabled()) {
             LOG.debug("getLedgerToRereplicate()");
         }
-        try {
-            while (true) {
+        while (true) {
+            final CountDownLatch changedLatch = new CountDownLatch(1);
+            Watcher w = new Watcher() {
+                public void process(WatchedEvent e) {
+                    if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged
+                            || e.getType() == Watcher.Event.EventType.NodeDeleted
+                            || e.getType() == Watcher.Event.EventType.NodeCreated
+                            || e.getState() == Watcher.Event.KeeperState.Expired
+                            || e.getState() == Watcher.Event.KeeperState.Disconnected) {
+                        changedLatch.countDown();
+                    }
+                }
+            };
+            try (SubTreeCache.WatchGuard wg = subTreeCache.registerWatcherWithGuard(w)) {
                 waitIfLedgerReplicationDisabled();
-                final CountDownLatch changedLatch = new CountDownLatch(1);
-                Watcher w = new Watcher() {
-                        public void process(WatchedEvent e) {
-                            if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged
-                                || e.getType() == Watcher.Event.EventType.NodeDeleted
-                                || e.getType() == Watcher.Event.EventType.NodeCreated
-                                || e.getState() == Watcher.Event.KeeperState.Expired
-                                || e.getState() == Watcher.Event.KeeperState.Disconnected) {
-                                changedLatch.countDown();
-                            }
-                        }
-                    };
-                long ledger = getLedgerToRereplicateFromHierarchy(urLedgerPath, 0, w);
+                long ledger = getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
                 if (ledger != -1) {
                     return ledger;
                 }
                 // nothing found, wait for a watcher to trigger
                 changedLatch.await();
+            } catch (KeeperException ke) {
+                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
             }
-        } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SubTreeCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SubTreeCache.java
new file mode 100644
index 0000000..418c458
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SubTreeCache.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Caching layer for traversing and monitoring changes on a znode subtree
+ *
+ * ZooKeeper does not provide a way to perform a recursive watch on a subtree.
+ * In order to detect changes to a subtree, we need to maintain a
+ * cache of nodes which have been listed and have not changed since.  This would
+ * mirror the set of nodes with live watches in ZooKeeper (since we can't
+ * cancel them at the moment).
+ *
+ * In order to avoid having to pre-read the whole subtree up front, we'll weaken
+ * the guarantee to only require firing the watcher for updates on nodes read since
+ * the watcher was registered which happened after the read.  We'll also permit
+ * spurious events elsewhere in the tree to avoid having to distinguish between
+ * nodes which were read before and after a watch was established.
+ *
+ * Finally, we'll allow (require, even) the user to cancel a registered watcher
+ * once no longer interested.
+ */
+public class SubTreeCache {
+    private static final Logger LOG = LoggerFactory.getLogger(SubTreeCache.class);
+
+    public interface TreeProvider {
+        List<String> getChildren(
+                String path, Watcher watcher) throws InterruptedException, KeeperException;
+    }
+
+    private class SubTreeNode implements Watcher {
+        String path;
+        private List<String> children;
+
+        SubTreeNode(String path) {
+            this.path = path;
+        }
+
+        private void setChildren(List<String> children) {
+            this.children = children;
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            synchronized (SubTreeCache.this) {
+                handleEvent(event);
+                cachedNodes.remove(path);
+            }
+        }
+
+        private List<String> getChildren() {
+            return new ArrayList<String>(children);
+        }
+    }
+
+    TreeProvider provider;
+    Set<Watcher> pendingWatchers = new HashSet<>();
+    Map<String, SubTreeNode> cachedNodes = new HashMap<>();
+
+    public SubTreeCache(TreeProvider provider) {
+        this.provider = provider;
+    }
+
+    synchronized private void handleEvent(WatchedEvent event) {
+        Set<Watcher> toReturn = pendingWatchers;
+        for (Watcher watcher: pendingWatchers) {
+            watcher.process(event);
+        }
+        pendingWatchers.clear();
+    }
+
+
+    /**
+     * Returns children of node
+     *
+     * @param path Path of which to get children
+     * @return Children of path
+     */
+    public synchronized List<String> getChildren(String path) throws KeeperException, InterruptedException {
+        SubTreeNode node = cachedNodes.get(path);
+        if (null == node) {
+            node = new SubTreeNode(path);
+            node.setChildren(provider.getChildren(path, node));
+            cachedNodes.put(path, node);
+        }
+        return node.getChildren();
+    }
+
+    /**
+     * Register a watcher
+     * <p>
+     * See class header for semantics.
+     *
+     * @param watcher watcher to register
+     */
+    public synchronized void registerWatcher(Watcher watcher) {
+        pendingWatchers.add(watcher);
+    }
+
+    /**
+     * Cancel a watcher (noop if not registered or already fired)
+     *
+     * @param watcher Watcher object to cancel
+     */
+    public synchronized void cancelWatcher(Watcher watcher) {
+        pendingWatchers.remove(watcher);
+    }
+
+    public class WatchGuard implements AutoCloseable {
+        final Watcher w;
+
+        WatchGuard(Watcher w) {
+            this.w = w;
+        }
+
+        @Override
+        public void close() {
+            cancelWatcher(w);
+        }
+    }
+
+    /**
+     * Register watcher and get interest guard object which can be used with try-with-resources
+     * <p>
+     * It's important not to leak watchers into this structure.  The returned WatchGuard
+     * can be used to ensure that the watch is unregistered upon exiting a scope.
+     *
+     * @param watcher Watcher to register
+     */
+    public synchronized WatchGuard registerWatcherWithGuard(Watcher watcher) {
+        registerWatcher(watcher);
+        return new WatchGuard(watcher);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/SubTreeCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/SubTreeCacheTest.java
new file mode 100644
index 0000000..fde101b
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/SubTreeCacheTest.java
@@ -0,0 +1,321 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class SubTreeCacheTest {
+    class TestTreeProvider implements SubTreeCache.TreeProvider {
+        class Node {
+            Watcher watcher = null;
+            public Map<String, Node> children = new HashMap<>();
+        }
+
+        final Node root = new Node();
+
+        Node getNode(String path) throws KeeperException {
+            String[] pathSegments = path.split("/");
+            Node cur = root;
+            for (String segment : pathSegments) {
+                if (segment.length() == 0)
+                    continue; // ignore leading empty one for leading /
+                if (cur.children.containsKey(segment)) {
+                    cur = cur.children.get(segment);
+                } else {
+                    throw KeeperException.create(KeeperException.Code.NONODE);
+                }
+            }
+            return cur;
+        }
+
+        @Override
+        public List<String> getChildren(
+                String path, Watcher watcher) throws InterruptedException, KeeperException {
+            Node node = getNode(path);
+
+            /* Enforce only one live watch per node */
+            Assert.assertTrue(null == node.watcher);
+
+            node.watcher = watcher;
+            return new ArrayList<String>(node.children.keySet());
+        }
+
+        public void createNode(String path) throws KeeperException {
+            String[] segments = path.split("/");
+            if (segments.length == 0) {
+                throw KeeperException.create(KeeperException.Code.NONODE);
+            }
+            String child = segments[segments.length - 1];
+            String[] parentSegments = Arrays.copyOfRange(segments, 0, segments.length - 1);
+            Node parent = getNode(String.join("/", parentSegments));
+            if (parent.children.containsKey(child)) {
+                throw KeeperException.create(KeeperException.Code.NODEEXISTS);
+            } else {
+                parent.children.put(child, new Node());
+                if (null != parent.watcher) {
+                    parent.watcher.process(
+                            new WatchedEvent(
+                                    Watcher.Event.EventType.NodeCreated,
+                                    Watcher.Event.KeeperState.SyncConnected,
+                                    path));
+                    parent.watcher = null;
+                }
+            }
+        }
+
+        public void removeNode(String path) throws KeeperException {
+            String[] segments = path.split("/");
+            if (segments.length == 0) {
+                throw KeeperException.create(KeeperException.Code.NONODE);
+            }
+            String child = segments[segments.length - 1];
+            String[] parentSegments = Arrays.copyOfRange(segments, 0, segments.length - 1);
+            String parentPath = String.join("/", parentSegments);
+            Node parent = getNode(parentPath);
+            if (!parent.children.containsKey(child)) {
+                throw KeeperException.create(KeeperException.Code.NONODE);
+            } else {
+                Node cNode = parent.children.get(child);
+                if (!cNode.children.isEmpty()) {
+                    throw KeeperException.create(KeeperException.Code.NOTEMPTY);
+                } else {
+                    if (null != cNode.watcher) {
+                        cNode.watcher.process(
+                                new WatchedEvent(
+                                        Watcher.Event.EventType.NodeChildrenChanged,
+                                        Watcher.Event.KeeperState.SyncConnected,
+                                        path));
+                        cNode.watcher = null;
+                    }
+                    if (null != parent.watcher) {
+                        parent.watcher.process(
+                                new WatchedEvent(
+                                        Watcher.Event.EventType.NodeDeleted,
+                                        Watcher.Event.KeeperState.SyncConnected,
+                                        parentPath));
+                        parent.watcher = null;
+                    }
+                    parent.children.remove(child);
+                }
+            }
+        }
+    }
+
+    TestTreeProvider tree = new TestTreeProvider();
+    SubTreeCache cache = new SubTreeCache(tree);
+
+    class TestWatch implements Watcher {
+        boolean fired = false;
+
+        @Override
+        public void process(WatchedEvent event) {
+            fired = true;
+        }
+
+        public boolean getFired() {
+            return fired;
+        }
+    }
+
+    TestWatch setWatch() {
+        TestWatch watch = new TestWatch();
+        cache.registerWatcher(watch);
+        return watch;
+    }
+
+    void assertFired(TestWatch watch) {
+        Assert.assertTrue(watch.getFired());
+    }
+
+    void assertNotFired(TestWatch watch) {
+        Assert.assertFalse(watch.getFired());
+    }
+
+    class TestWatchGuard extends TestWatch implements AutoCloseable {
+        SubTreeCache.WatchGuard guard;
+
+        void setGuard(SubTreeCache.WatchGuard guard) {
+            this.guard = guard;
+        }
+
+        @Override
+        public void close() throws Exception {
+            guard.close();
+        }
+    }
+
+    TestWatchGuard setWatchWithGuard() {
+        TestWatchGuard watch = new TestWatchGuard();
+        watch.setGuard(cache.registerWatcherWithGuard(watch));
+        return watch;
+    }
+
+    void readAssertChildren(String path, String[] children) throws KeeperException, InterruptedException {
+        SortedSet<String> shouldBe = new TreeSet<String>(Arrays.asList(children));
+        List<String> returned = cache.getChildren(path);
+        SortedSet<String> is = new TreeSet<String>(returned);
+        returned.clear(); // trip up implementations which return an internal reference
+        Assert.assertEquals(shouldBe, is);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        String[] preCreate =
+                {"/a"
+                        , "/a/a"
+                        , "/a/a/a"
+                        , "/a/a/b"
+                        , "/a/b"
+                        , "/a/c"
+                        , "/b"
+                        , "/b/a"
+                };
+        for (String path : preCreate) {
+            tree.createNode(path);
+        }
+    }
+
+    @Test(timeout=5000)
+    public void testNoUpdate() throws Exception {
+        TestWatch watch = setWatch();
+        readAssertChildren("/a/a", new String[]{"a", "b"});
+        assertNotFired(watch);
+    }
+
+    @Test(timeout=5000)
+    public void testSingleCreate() throws Exception {
+        TestWatch watch = setWatch();
+        readAssertChildren("/a/a", new String[]{"a", "b"});
+        tree.createNode("/a/a/c");
+        assertFired(watch);
+    }
+
+    @Test(timeout=5000)
+    public void testSingleRemoval() throws Exception {
+        TestWatch watch = setWatch();
+        readAssertChildren("/a/a", new String[]{"a", "b"});
+        tree.removeNode("/a/a/b");
+        assertFired(watch);
+    }
+
+    @Test(timeout=5000)
+    public void testCancelation() throws Exception {
+        TestWatch watch = setWatch();
+        readAssertChildren("/a/a", new String[]{"a", "b"});
+        cache.cancelWatcher(watch);
+        tree.createNode("/a/a/c");
+        assertNotFired(watch);
+    }
+
+    @Test(timeout=5000)
+    public void testGuardCancelation() throws Exception {
+        TestWatch watch;
+        try (TestWatchGuard guard = setWatchWithGuard()) {
+            readAssertChildren("/a/a", new String[]{"a", "b"});
+            watch = guard;
+        }
+        tree.createNode("/a/a/c");
+        assertNotFired(watch);
+    }
+
+    @Test(timeout=5000)
+    public void testGuardCancelationExceptional() throws Exception {
+        TestWatch watch = null;
+        try (TestWatchGuard guard = setWatchWithGuard()) {
+            watch = guard;
+            readAssertChildren("/z/a", new String[]{});
+        } catch (Exception e) {
+        }
+        tree.createNode("/a/a/c");
+        assertNotFired(watch);
+    }
+
+    @Test(timeout=5000)
+    public void testDuplicateWatch() throws Exception {
+        try (TestWatchGuard watch = setWatchWithGuard()) {
+            readAssertChildren("/a/a", new String[]{"a", "b"});
+        }
+        try (TestWatchGuard watch = setWatchWithGuard()) {
+            readAssertChildren("/a/a", new String[]{"a", "b"});
+            assertNotFired(watch);
+            tree.createNode("/a/a/e");
+            assertFired(watch);
+        }
+    }
+
+    @Test(timeout=5000, expected = KeeperException.class)
+    public void testNoNode() throws Exception {
+        try (TestWatchGuard watch = setWatchWithGuard()) {
+            readAssertChildren("/z/a", new String[]{});
+        }
+    }
+
+    @Test(timeout=5000)
+    public void testRemoveEmptyNode() throws Exception {
+        try (TestWatchGuard watch = setWatchWithGuard()) {
+            readAssertChildren("/a/a/a", new String[]{});
+            tree.removeNode("/a/a/a");
+            assertFired(watch);
+        }
+    }
+
+    @Test(timeout=5000)
+    public void doubleWatch() throws Exception {
+        try (TestWatchGuard watch1 = setWatchWithGuard()) {
+            readAssertChildren("/a/a", new String[]{"a", "b"});
+            try (TestWatchGuard watch2 = setWatchWithGuard()) {
+                tree.createNode("/a/a/e");
+                assertFired(watch1);
+                readAssertChildren("/a/b", new String[]{});
+                tree.createNode("/a/b/e");
+                assertFired(watch2);
+            }
+        }
+    }
+
+    @Test(timeout=5000)
+    public void sequentialWatch() throws Exception {
+        try (TestWatchGuard watch = setWatchWithGuard()) {
+            readAssertChildren("/a/a", new String[]{"a", "b"});
+            tree.removeNode("/a/a/a");
+            assertFired(watch);
+        }
+        try (TestWatchGuard watch = setWatchWithGuard()) {
+            readAssertChildren("/a/a", new String[]{"b"});
+            tree.removeNode("/a/a/b");
+            assertFired(watch);
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].