You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/28 21:28:50 UTC
[bookkeeper] branch master updated: BOOKKEEPER-1086:
ZkUnderreplicationManager cache watcher
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 32c2859 BOOKKEEPER-1086: ZkUnderreplicationManager cache watcher
32c2859 is described below
commit 32c2859f0bec9ab74965d954bce038480e4dc2ba
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Wed Jun 28 14:28:38 2017 -0700
BOOKKEEPER-1086: ZkUnderreplicationManager cache watcher
Previously, getLedgerToReplicate left watches each time it traversed the
tree until it found a suitable replication target. Since we don't have
a way of canceling watches, these watches tended to get abandoned,
particularly on interior nodes, which aren't changed much. Thus,
over time, some nodes would build up a very large number of watch.
Instead, introduce a caching mechanism to remember outstanding watches
and avoid ever creating two watches on the same node.
Author: Samuel Just <sjustsalesforce.com>
Author: Samuel Just <sj...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
This closes #193 from athanatos/forupstream/BOOKKEEPER-1098
---
.../meta/ZkLedgerUnderreplicationManager.java | 67 +++--
.../org/apache/bookkeeper/util/SubTreeCache.java | 166 +++++++++++
.../apache/bookkeeper/util/SubTreeCacheTest.java | 321 +++++++++++++++++++++
3 files changed, 523 insertions(+), 31 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index b695336..e986317 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.SubTreeCache;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -102,6 +103,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
private final String layoutZNode;
private final AbstractConfiguration conf;
private final ZooKeeper zkc;
+ private final SubTreeCache subTreeCache;
public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
@@ -114,6 +116,12 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
idExtractionPattern = Pattern.compile("urL(\\d+)$");
this.zkc = zkc;
+ this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
+ @Override
+ public List<String> getChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {
+ return zkc.getChildren(path, watcher);
+ }
+ });
checkLayout();
}
@@ -384,12 +392,12 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
};
}
- private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w)
+ private long getLedgerToRereplicateFromHierarchy(String parent, long depth)
throws KeeperException, InterruptedException {
if (depth == 4) {
List<String> children;
try {
- children = zkc.getChildren(parent, w);
+ children = subTreeCache.getChildren(parent);
} catch (KeeperException.NoNodeException nne) {
// can occur if another underreplicated ledger's
// hierarchy is being cleaned up
@@ -401,8 +409,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
while (children.size() > 0) {
String tryChild = children.get(0);
try {
- String lockPath = urLockPath + "/" + tryChild;
- if (zkc.exists(lockPath, w) != null) {
+ List<String> locks = subTreeCache.getChildren(urLockPath);
+ if (locks.contains(tryChild)) {
children.remove(tryChild);
continue;
}
@@ -416,6 +424,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
continue;
}
+ String lockPath = urLockPath + "/" + tryChild;
long ledgerId = getLedgerId(tryChild);
zkc.create(lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
@@ -431,7 +440,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
List<String> children;
try {
- children = zkc.getChildren(parent, w);
+ children = subTreeCache.getChildren(parent);
} catch (KeeperException.NoNodeException nne) {
// can occur if another underreplicated ledger's
// hierarchy is being cleaned up
@@ -443,7 +452,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
while (children.size() > 0) {
String tryChild = children.get(0);
String tryPath = parent + "/" + tryChild;
- long ledger = getLedgerToRereplicateFromHierarchy(tryPath, depth + 1, w);
+ long ledger = getLedgerToRereplicateFromHierarchy(tryPath, depth + 1);
if (ledger != -1) {
return ledger;
}
@@ -459,11 +468,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
LOG.debug("pollLedgerToRereplicate()");
}
try {
- Watcher w = new Watcher() {
- public void process(WatchedEvent e) { // do nothing
- }
- };
- return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0, w);
+ return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
@@ -477,33 +482,33 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
if (LOG.isDebugEnabled()) {
LOG.debug("getLedgerToRereplicate()");
}
- try {
- while (true) {
+ while (true) {
+ final CountDownLatch changedLatch = new CountDownLatch(1);
+ Watcher w = new Watcher() {
+ public void process(WatchedEvent e) {
+ if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged
+ || e.getType() == Watcher.Event.EventType.NodeDeleted
+ || e.getType() == Watcher.Event.EventType.NodeCreated
+ || e.getState() == Watcher.Event.KeeperState.Expired
+ || e.getState() == Watcher.Event.KeeperState.Disconnected) {
+ changedLatch.countDown();
+ }
+ }
+ };
+ try (SubTreeCache.WatchGuard wg = subTreeCache.registerWatcherWithGuard(w)) {
waitIfLedgerReplicationDisabled();
- final CountDownLatch changedLatch = new CountDownLatch(1);
- Watcher w = new Watcher() {
- public void process(WatchedEvent e) {
- if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged
- || e.getType() == Watcher.Event.EventType.NodeDeleted
- || e.getType() == Watcher.Event.EventType.NodeCreated
- || e.getState() == Watcher.Event.KeeperState.Expired
- || e.getState() == Watcher.Event.KeeperState.Disconnected) {
- changedLatch.countDown();
- }
- }
- };
- long ledger = getLedgerToRereplicateFromHierarchy(urLedgerPath, 0, w);
+ long ledger = getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
if (ledger != -1) {
return ledger;
}
// nothing found, wait for a watcher to trigger
changedLatch.await();
+ } catch (KeeperException ke) {
+ throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
}
- } catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SubTreeCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SubTreeCache.java
new file mode 100644
index 0000000..418c458
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SubTreeCache.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Caching layer for traversing and monitoring changes on a znode subtree
+ *
+ * ZooKeeper does not provide a way to perform a recursive watch on a subtree.
+ * In order to detect changes to a subtree, we need to maintain a
+ * cache of nodes which have been listed and have not changed since. This would
+ * mirror the set of nodes with live watches in ZooKeeper (since we can't
+ * cancel them at the moment).
+ *
+ * In order to avoid having to pre-read the whole subtree up front, we'll weaken
+ * the guarantee to only require firing the watcher for updates on nodes read since
+ * the watcher was registered which happened after the read. We'll also permit
+ * spurious events elsewhere in the tree to avoid having to distinguish between
+ * nodes which were read before and after a watch was established.
+ *
+ * Finally, we'll allow (require, even) the user to cancel a registered watcher
+ * once no longer interested.
+ */
+public class SubTreeCache {
+ private static final Logger LOG = LoggerFactory.getLogger(SubTreeCache.class);
+
+ public interface TreeProvider {
+ List<String> getChildren(
+ String path, Watcher watcher) throws InterruptedException, KeeperException;
+ }
+
+ private class SubTreeNode implements Watcher {
+ String path;
+ private List<String> children;
+
+ SubTreeNode(String path) {
+ this.path = path;
+ }
+
+ private void setChildren(List<String> children) {
+ this.children = children;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (SubTreeCache.this) {
+ handleEvent(event);
+ cachedNodes.remove(path);
+ }
+ }
+
+ private List<String> getChildren() {
+ return new ArrayList<String>(children);
+ }
+ }
+
+ TreeProvider provider;
+ Set<Watcher> pendingWatchers = new HashSet<>();
+ Map<String, SubTreeNode> cachedNodes = new HashMap<>();
+
+ public SubTreeCache(TreeProvider provider) {
+ this.provider = provider;
+ }
+
+ synchronized private void handleEvent(WatchedEvent event) {
+ Set<Watcher> toReturn = pendingWatchers;
+ for (Watcher watcher: pendingWatchers) {
+ watcher.process(event);
+ }
+ pendingWatchers.clear();
+ }
+
+
+ /**
+ * Returns children of node
+ *
+ * @param path Path of which to get children
+ * @return Children of path
+ */
+ public synchronized List<String> getChildren(String path) throws KeeperException, InterruptedException {
+ SubTreeNode node = cachedNodes.get(path);
+ if (null == node) {
+ node = new SubTreeNode(path);
+ node.setChildren(provider.getChildren(path, node));
+ cachedNodes.put(path, node);
+ }
+ return node.getChildren();
+ }
+
+ /**
+ * Register a watcher
+ * <p>
+ * See class header for semantics.
+ *
+ * @param watcher watcher to register
+ */
+ public synchronized void registerWatcher(Watcher watcher) {
+ pendingWatchers.add(watcher);
+ }
+
+ /**
+ * Cancel a watcher (noop if not registered or already fired)
+ *
+ * @param watcher Watcher object to cancel
+ */
+ public synchronized void cancelWatcher(Watcher watcher) {
+ pendingWatchers.remove(watcher);
+ }
+
+ public class WatchGuard implements AutoCloseable {
+ final Watcher w;
+
+ WatchGuard(Watcher w) {
+ this.w = w;
+ }
+
+ @Override
+ public void close() {
+ cancelWatcher(w);
+ }
+ }
+
+ /**
+ * Register watcher and get interest guard object which can be used with try-with-resources
+ * <p>
+ * It's important not to leak watchers into this structure. The returned WatchGuard
+ * can be used to ensure that the watch is unregistered upon exiting a scope.
+ *
+ * @param watcher Watcher to register
+ */
+ public synchronized WatchGuard registerWatcherWithGuard(Watcher watcher) {
+ registerWatcher(watcher);
+ return new WatchGuard(watcher);
+ }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/SubTreeCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/SubTreeCacheTest.java
new file mode 100644
index 0000000..fde101b
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/SubTreeCacheTest.java
@@ -0,0 +1,321 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.util;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class SubTreeCacheTest {
+ class TestTreeProvider implements SubTreeCache.TreeProvider {
+ class Node {
+ Watcher watcher = null;
+ public Map<String, Node> children = new HashMap<>();
+ }
+
+ final Node root = new Node();
+
+ Node getNode(String path) throws KeeperException {
+ String[] pathSegments = path.split("/");
+ Node cur = root;
+ for (String segment : pathSegments) {
+ if (segment.length() == 0)
+ continue; // ignore leading empty one for leading /
+ if (cur.children.containsKey(segment)) {
+ cur = cur.children.get(segment);
+ } else {
+ throw KeeperException.create(KeeperException.Code.NONODE);
+ }
+ }
+ return cur;
+ }
+
+ @Override
+ public List<String> getChildren(
+ String path, Watcher watcher) throws InterruptedException, KeeperException {
+ Node node = getNode(path);
+
+ /* Enforce only one live watch per node */
+ Assert.assertTrue(null == node.watcher);
+
+ node.watcher = watcher;
+ return new ArrayList<String>(node.children.keySet());
+ }
+
+ public void createNode(String path) throws KeeperException {
+ String[] segments = path.split("/");
+ if (segments.length == 0) {
+ throw KeeperException.create(KeeperException.Code.NONODE);
+ }
+ String child = segments[segments.length - 1];
+ String[] parentSegments = Arrays.copyOfRange(segments, 0, segments.length - 1);
+ Node parent = getNode(String.join("/", parentSegments));
+ if (parent.children.containsKey(child)) {
+ throw KeeperException.create(KeeperException.Code.NODEEXISTS);
+ } else {
+ parent.children.put(child, new Node());
+ if (null != parent.watcher) {
+ parent.watcher.process(
+ new WatchedEvent(
+ Watcher.Event.EventType.NodeCreated,
+ Watcher.Event.KeeperState.SyncConnected,
+ path));
+ parent.watcher = null;
+ }
+ }
+ }
+
+ public void removeNode(String path) throws KeeperException {
+ String[] segments = path.split("/");
+ if (segments.length == 0) {
+ throw KeeperException.create(KeeperException.Code.NONODE);
+ }
+ String child = segments[segments.length - 1];
+ String[] parentSegments = Arrays.copyOfRange(segments, 0, segments.length - 1);
+ String parentPath = String.join("/", parentSegments);
+ Node parent = getNode(parentPath);
+ if (!parent.children.containsKey(child)) {
+ throw KeeperException.create(KeeperException.Code.NONODE);
+ } else {
+ Node cNode = parent.children.get(child);
+ if (!cNode.children.isEmpty()) {
+ throw KeeperException.create(KeeperException.Code.NOTEMPTY);
+ } else {
+ if (null != cNode.watcher) {
+ cNode.watcher.process(
+ new WatchedEvent(
+ Watcher.Event.EventType.NodeChildrenChanged,
+ Watcher.Event.KeeperState.SyncConnected,
+ path));
+ cNode.watcher = null;
+ }
+ if (null != parent.watcher) {
+ parent.watcher.process(
+ new WatchedEvent(
+ Watcher.Event.EventType.NodeDeleted,
+ Watcher.Event.KeeperState.SyncConnected,
+ parentPath));
+ parent.watcher = null;
+ }
+ parent.children.remove(child);
+ }
+ }
+ }
+ }
+
+ TestTreeProvider tree = new TestTreeProvider();
+ SubTreeCache cache = new SubTreeCache(tree);
+
+ class TestWatch implements Watcher {
+ boolean fired = false;
+
+ @Override
+ public void process(WatchedEvent event) {
+ fired = true;
+ }
+
+ public boolean getFired() {
+ return fired;
+ }
+ }
+
+ TestWatch setWatch() {
+ TestWatch watch = new TestWatch();
+ cache.registerWatcher(watch);
+ return watch;
+ }
+
+ void assertFired(TestWatch watch) {
+ Assert.assertTrue(watch.getFired());
+ }
+
+ void assertNotFired(TestWatch watch) {
+ Assert.assertFalse(watch.getFired());
+ }
+
+ class TestWatchGuard extends TestWatch implements AutoCloseable {
+ SubTreeCache.WatchGuard guard;
+
+ void setGuard(SubTreeCache.WatchGuard guard) {
+ this.guard = guard;
+ }
+
+ @Override
+ public void close() throws Exception {
+ guard.close();
+ }
+ }
+
+ TestWatchGuard setWatchWithGuard() {
+ TestWatchGuard watch = new TestWatchGuard();
+ watch.setGuard(cache.registerWatcherWithGuard(watch));
+ return watch;
+ }
+
+ void readAssertChildren(String path, String[] children) throws KeeperException, InterruptedException {
+ SortedSet<String> shouldBe = new TreeSet<String>(Arrays.asList(children));
+ List<String> returned = cache.getChildren(path);
+ SortedSet<String> is = new TreeSet<String>(returned);
+ returned.clear(); // trip up implementations which return an internal reference
+ Assert.assertEquals(shouldBe, is);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ String[] preCreate =
+ {"/a"
+ , "/a/a"
+ , "/a/a/a"
+ , "/a/a/b"
+ , "/a/b"
+ , "/a/c"
+ , "/b"
+ , "/b/a"
+ };
+ for (String path : preCreate) {
+ tree.createNode(path);
+ }
+ }
+
+ @Test(timeout=5000)
+ public void testNoUpdate() throws Exception {
+ TestWatch watch = setWatch();
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ assertNotFired(watch);
+ }
+
+ @Test(timeout=5000)
+ public void testSingleCreate() throws Exception {
+ TestWatch watch = setWatch();
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ tree.createNode("/a/a/c");
+ assertFired(watch);
+ }
+
+ @Test(timeout=5000)
+ public void testSingleRemoval() throws Exception {
+ TestWatch watch = setWatch();
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ tree.removeNode("/a/a/b");
+ assertFired(watch);
+ }
+
+ @Test(timeout=5000)
+ public void testCancelation() throws Exception {
+ TestWatch watch = setWatch();
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ cache.cancelWatcher(watch);
+ tree.createNode("/a/a/c");
+ assertNotFired(watch);
+ }
+
+ @Test(timeout=5000)
+ public void testGuardCancelation() throws Exception {
+ TestWatch watch;
+ try (TestWatchGuard guard = setWatchWithGuard()) {
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ watch = guard;
+ }
+ tree.createNode("/a/a/c");
+ assertNotFired(watch);
+ }
+
+ @Test(timeout=5000)
+ public void testGuardCancelationExceptional() throws Exception {
+ TestWatch watch = null;
+ try (TestWatchGuard guard = setWatchWithGuard()) {
+ watch = guard;
+ readAssertChildren("/z/a", new String[]{});
+ } catch (Exception e) {
+ }
+ tree.createNode("/a/a/c");
+ assertNotFired(watch);
+ }
+
+ @Test(timeout=5000)
+ public void testDuplicateWatch() throws Exception {
+ try (TestWatchGuard watch = setWatchWithGuard()) {
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ }
+ try (TestWatchGuard watch = setWatchWithGuard()) {
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ assertNotFired(watch);
+ tree.createNode("/a/a/e");
+ assertFired(watch);
+ }
+ }
+
+ @Test(timeout=5000, expected = KeeperException.class)
+ public void testNoNode() throws Exception {
+ try (TestWatchGuard watch = setWatchWithGuard()) {
+ readAssertChildren("/z/a", new String[]{});
+ }
+ }
+
+ @Test(timeout=5000)
+ public void testRemoveEmptyNode() throws Exception {
+ try (TestWatchGuard watch = setWatchWithGuard()) {
+ readAssertChildren("/a/a/a", new String[]{});
+ tree.removeNode("/a/a/a");
+ assertFired(watch);
+ }
+ }
+
+ @Test(timeout=5000)
+ public void doubleWatch() throws Exception {
+ try (TestWatchGuard watch1 = setWatchWithGuard()) {
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ try (TestWatchGuard watch2 = setWatchWithGuard()) {
+ tree.createNode("/a/a/e");
+ assertFired(watch1);
+ readAssertChildren("/a/b", new String[]{});
+ tree.createNode("/a/b/e");
+ assertFired(watch2);
+ }
+ }
+ }
+
+ @Test(timeout=5000)
+ public void sequentialWatch() throws Exception {
+ try (TestWatchGuard watch = setWatchWithGuard()) {
+ readAssertChildren("/a/a", new String[]{"a", "b"});
+ tree.removeNode("/a/a/a");
+ assertFired(watch);
+ }
+ try (TestWatchGuard watch = setWatchWithGuard()) {
+ readAssertChildren("/a/a", new String[]{"b"});
+ tree.removeNode("/a/a/b");
+ assertFired(watch);
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].