You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ti...@apache.org on 2023/05/05 07:59:00 UTC

[zookeeper] branch master updated: ZOOKEEPER-4466: Support different watch modes on same path (#1859)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a64dbf5b0 ZOOKEEPER-4466: Support different watch modes on same path (#1859)
a64dbf5b0 is described below

commit a64dbf5b06ca1a73dc2ad6c7d31e679e312082aa
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Fri May 5 15:58:42 2023 +0800

    ZOOKEEPER-4466: Support different watch modes on same path (#1859)
    
    Signed-off-by: Kezhu Wang <ke...@gmail.com>
    Co-authored-by: tison <wa...@gmail.com>
---
 .../java/org/apache/zookeeper/server/DataTree.java |   5 +-
 .../zookeeper/server/watch/IWatchManager.java      |   9 --
 .../zookeeper/server/watch/WatchManager.java       | 115 ++++++++++++---------
 .../apache/zookeeper/server/watch/WatchStats.java  |  89 ++++++++++++++++
 .../apache/zookeeper/server/watch/WatcherMode.java |   2 +-
 .../zookeeper/server/watch/WatcherModeManager.java |  96 -----------------
 .../server/watch/RecursiveWatchQtyTest.java        |  50 +++------
 .../test/PersistentRecursiveWatcherTest.java       |  48 +++++++++
 8 files changed, 219 insertions(+), 195 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index a6f605390..603cb0b38 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -675,7 +675,9 @@ public class DataTree {
     public void addWatch(String basePath, Watcher watcher, int mode) {
         WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
         dataWatches.addWatch(basePath, watcher, watcherMode);
-        childWatches.addWatch(basePath, watcher, watcherMode);
+        if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
+            childWatches.addWatch(basePath, watcher, watcherMode);
+        }
     }
 
     public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
@@ -1511,7 +1513,6 @@ public class DataTree {
             this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
         }
         for (String path : persistentRecursiveWatches) {
-            this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
             this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
         }
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
index 1bc44c805..4eea5eca0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
@@ -144,13 +144,4 @@ public interface IWatchManager {
      *
      */
     void dumpWatches(PrintWriter pwriter, boolean byPath);
-
-    /**
-     * Return the current number of recursive watchers
-     *
-     * @return qty
-     */
-    default int getRecursiveWatchQty() {
-        return 0;
-    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
index c5b133059..c85c3d846 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.watch;
 
 import java.io.PrintWriter;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -45,9 +46,9 @@ public class WatchManager implements IWatchManager {
 
     private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
 
-    private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
+    private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();
 
-    private final WatcherModeManager watcherModeManager = new WatcherModeManager();
+    private int recursiveWatchQty = 0;
 
     @Override
     public synchronized int size() {
@@ -84,25 +85,34 @@ public class WatchManager implements IWatchManager {
         }
         list.add(watcher);
 
-        Set<String> paths = watch2Paths.get(watcher);
+        Map<String, WatchStats> paths = watch2Paths.get(watcher);
         if (paths == null) {
             // cnxns typically have many watches, so use default cap here
-            paths = new HashSet<>();
+            paths = new HashMap<>();
             watch2Paths.put(watcher, paths);
         }
 
-        watcherModeManager.setWatcherMode(watcher, path, watcherMode);
+        WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
+        WatchStats newStats = stats.addMode(watcherMode);
 
-        return paths.add(path);
+        if (newStats != stats) {
+            paths.put(path, newStats);
+            if (watcherMode.isRecursive()) {
+                ++recursiveWatchQty;
+            }
+            return true;
+        }
+
+        return false;
     }
 
     @Override
     public synchronized void removeWatcher(Watcher watcher) {
-        Set<String> paths = watch2Paths.remove(watcher);
+        Map<String, WatchStats> paths = watch2Paths.remove(watcher);
         if (paths == null) {
             return;
         }
-        for (String p : paths) {
+        for (String p : paths.keySet()) {
             Set<Watcher> list = watchTable.get(p);
             if (list != null) {
                 list.remove(watcher);
@@ -110,7 +120,11 @@ public class WatchManager implements IWatchManager {
                     watchTable.remove(p);
                 }
             }
-            watcherModeManager.removeWatcher(watcher, p);
+        }
+        for (WatchStats stats : paths.values()) {
+            if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+                --recursiveWatchQty;
+            }
         }
     }
 
@@ -123,8 +137,8 @@ public class WatchManager implements IWatchManager {
     public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
         WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
         Set<Watcher> watchers = new HashSet<>();
-        PathParentIterator pathParentIterator = getPathParentIterator(path);
         synchronized (this) {
+            PathParentIterator pathParentIterator = getPathParentIterator(path);
             for (String localPath : pathParentIterator.asIterable()) {
                 Set<Watcher> thisWatchers = watchTable.get(localPath);
                 if (thisWatchers == null || thisWatchers.isEmpty()) {
@@ -133,20 +147,23 @@ public class WatchManager implements IWatchManager {
                 Iterator<Watcher> iterator = thisWatchers.iterator();
                 while (iterator.hasNext()) {
                     Watcher watcher = iterator.next();
-                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
-                    if (watcherMode.isRecursive()) {
-                        if (type != EventType.NodeChildrenChanged) {
-                            watchers.add(watcher);
-                        }
-                    } else if (!pathParentIterator.atParentPath()) {
+                    Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
+                    WatchStats stats = paths.get(localPath);
+                    if (stats == null) {
+                        LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
+                        continue;
+                    }
+                    if (!pathParentIterator.atParentPath()) {
                         watchers.add(watcher);
-                        if (!watcherMode.isPersistent()) {
+                        WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
+                        if (newStats == WatchStats.NONE) {
                             iterator.remove();
-                            Set<String> paths = watch2Paths.get(watcher);
-                            if (paths != null) {
-                                paths.remove(localPath);
-                            }
+                            paths.remove(localPath);
+                        } else if (newStats != stats) {
+                            paths.put(localPath, newStats);
                         }
+                    } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+                        watchers.add(watcher);
                     }
                 }
                 if (thisWatchers.isEmpty()) {
@@ -199,7 +216,7 @@ public class WatchManager implements IWatchManager {
         sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n");
 
         int total = 0;
-        for (Set<String> paths : watch2Paths.values()) {
+        for (Map<String, WatchStats> paths : watch2Paths.values()) {
             total += paths.size();
         }
         sb.append("Total watches:").append(total);
@@ -219,10 +236,10 @@ public class WatchManager implements IWatchManager {
                 }
             }
         } else {
-            for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
+            for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
                 pwriter.print("0x");
                 pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId()));
-                for (String path : e.getValue()) {
+                for (String path : e.getValue().keySet()) {
                     pwriter.print("\t");
                     pwriter.println(path);
                 }
@@ -232,31 +249,28 @@ public class WatchManager implements IWatchManager {
 
     @Override
     public synchronized boolean containsWatcher(String path, Watcher watcher) {
-        WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path);
-        PathParentIterator pathParentIterator = getPathParentIterator(path);
-        for (String localPath : pathParentIterator.asIterable()) {
-            Set<Watcher> watchers = watchTable.get(localPath);
-            if (!pathParentIterator.atParentPath()) {
-                if (watchers != null) {
-                    return true;    // at the leaf node, all watcher types match
-                }
-            }
-            if (watcherMode.isRecursive()) {
-                return true;
-            }
-        }
-        return false;
+        Set<Watcher> list = watchTable.get(path);
+        return list != null && list.contains(watcher);
     }
 
     @Override
     public synchronized boolean removeWatcher(String path, Watcher watcher) {
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null || !paths.remove(path)) {
+        Map<String, WatchStats> paths = watch2Paths.get(watcher);
+        if (paths == null) {
             return false;
         }
 
+        WatchStats stats = paths.remove(path);
+        if (stats == null) {
+            return false;
+        }
+        if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+            --recursiveWatchQty;
+        }
+
         Set<Watcher> list = watchTable.get(path);
         if (list == null || !list.remove(watcher)) {
+            LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
             return false;
         }
 
@@ -264,17 +278,20 @@ public class WatchManager implements IWatchManager {
             watchTable.remove(path);
         }
 
-        watcherModeManager.removeWatcher(watcher, path);
-
         return true;
     }
 
+    // VisibleForTesting
+    Map<Watcher, Map<String, WatchStats>> getWatch2Paths() {
+        return watch2Paths;
+    }
+
     @Override
     public synchronized WatchesReport getWatches() {
         Map<Long, Set<String>> id2paths = new HashMap<>();
-        for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
+        for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
             Long id = ((ServerCnxn) e.getKey()).getSessionId();
-            Set<String> paths = new HashSet<>(e.getValue());
+            Set<String> paths = new HashSet<>(e.getValue().keySet());
             id2paths.put(id, paths);
         }
         return new WatchesReport(id2paths);
@@ -296,7 +313,7 @@ public class WatchManager implements IWatchManager {
     @Override
     public synchronized WatchesSummary getWatchesSummary() {
         int totalWatches = 0;
-        for (Set<String> paths : watch2Paths.values()) {
+        for (Map<String, WatchStats> paths : watch2Paths.values()) {
             totalWatches += paths.size();
         }
         return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches);
@@ -305,13 +322,13 @@ public class WatchManager implements IWatchManager {
     @Override
     public void shutdown() { /* do nothing */ }
 
-    @Override
-    public int getRecursiveWatchQty() {
-        return watcherModeManager.getRecursiveQty();
+    // VisibleForTesting
+    synchronized int getRecursiveWatchQty() {
+        return recursiveWatchQty;
     }
 
     private PathParentIterator getPathParentIterator(String path) {
-        if (watcherModeManager.getRecursiveQty() == 0) {
+        if (getRecursiveWatchQty() == 0) {
             return PathParentIterator.forPathOnly(path);
         }
         return PathParentIterator.forAll(path);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java
new file mode 100644
index 000000000..fd0c0259e
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+/**
+ * Statistics for multiple different watches on one node.
+ */
+public final class WatchStats {
+    private static final WatchStats[] WATCH_STATS = new WatchStats[] {
+            new WatchStats(0), // NONE
+            new WatchStats(1), // STANDARD
+            new WatchStats(2), // PERSISTENT
+            new WatchStats(3), // STANDARD + PERSISTENT
+            new WatchStats(4), // PERSISTENT_RECURSIVE
+            new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
+            new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
+            new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
+    };
+
+    /**
+     * Stats that have no watchers attached.
+     *
+     * <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}.
+     */
+    public static final WatchStats NONE = WATCH_STATS[0];
+
+    private final int flags;
+
+    private WatchStats(int flags) {
+        this.flags = flags;
+    }
+
+    private static int modeToFlag(WatcherMode mode) {
+        return 1 << mode.ordinal();
+    }
+
+    /**
+     * Compute stats after given mode attached to node.
+     *
+     * @param mode watcher mode
+     * @return a new stats if given mode is not attached to this node before, otherwise old stats
+     */
+    public WatchStats addMode(WatcherMode mode) {
+        int flags = this.flags | modeToFlag(mode);
+        return WATCH_STATS[flags];
+    }
+
+    /**
+     * Compute stats after given mode removed from node.
+     *
+     * @param mode watcher mode
+     * @return null if given mode is the last attached mode, otherwise a new stats
+     */
+    public WatchStats removeMode(WatcherMode mode) {
+        int mask = ~modeToFlag(mode);
+        int flags = this.flags & mask;
+        if (flags == 0) {
+            return NONE;
+        }
+        return WATCH_STATS[flags];
+    }
+
+    /**
+     * Check whether given mode is attached to this node.
+     *
+     * @param mode watcher mode
+     * @return true if given mode is attached to this node.
+     */
+    public boolean hasMode(WatcherMode mode) {
+        int flags = modeToFlag(mode);
+        return (this.flags & flags) != 0;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
index b8a1dda74..e05ba900e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
@@ -23,7 +23,7 @@ import org.apache.zookeeper.ZooDefs;
 public enum WatcherMode {
     STANDARD(false, false),
     PERSISTENT(true, false),
-    PERSISTENT_RECURSIVE(true, true)
+    PERSISTENT_RECURSIVE(true, true),
     ;
 
     public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
deleted file mode 100644
index c1a8225f8..000000000
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.watch;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.zookeeper.Watcher;
-
-class WatcherModeManager {
-    private final Map<Key, WatcherMode> watcherModes = new ConcurrentHashMap<>();
-    private final AtomicInteger recursiveQty = new AtomicInteger(0);
-
-    private static class Key {
-        private final Watcher watcher;
-        private final String path;
-
-        Key(Watcher watcher, String path) {
-            this.watcher = watcher;
-            this.path = path;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            Key key = (Key) o;
-            return watcher.equals(key.watcher) && path.equals(key.path);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(watcher, path);
-        }
-    }
-
-    // VisibleForTesting
-    Map<Key, WatcherMode> getWatcherModes() {
-        return watcherModes;
-    }
-
-    void setWatcherMode(Watcher watcher, String path, WatcherMode mode) {
-        if (mode == WatcherMode.DEFAULT_WATCHER_MODE) {
-            removeWatcher(watcher, path);
-        } else {
-            adjustRecursiveQty(watcherModes.put(new Key(watcher, path), mode), mode);
-        }
-    }
-
-    WatcherMode getWatcherMode(Watcher watcher, String path) {
-        return watcherModes.getOrDefault(new Key(watcher, path), WatcherMode.DEFAULT_WATCHER_MODE);
-    }
-
-    void removeWatcher(Watcher watcher, String path) {
-        adjustRecursiveQty(watcherModes.remove(new Key(watcher, path)), WatcherMode.DEFAULT_WATCHER_MODE);
-    }
-
-    int getRecursiveQty() {
-        return recursiveQty.get();
-    }
-
-    // recursiveQty is an optimization to avoid having to walk the map every time this value is needed
-    private void adjustRecursiveQty(WatcherMode oldMode, WatcherMode newMode) {
-        if (oldMode == null) {
-            oldMode = WatcherMode.DEFAULT_WATCHER_MODE;
-        }
-        if (oldMode.isRecursive() != newMode.isRecursive()) {
-            if (newMode.isRecursive()) {
-                recursiveQty.incrementAndGet();
-            } else {
-                recursiveQty.decrementAndGet();
-            }
-        }
-    }
-}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
index 78b13bb33..0582ddafc 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
@@ -51,28 +51,6 @@ public class RecursiveWatchQtyTest {
         watchManager = new WatchManager();
     }
 
-    @Test
-    public void testRecursiveQty() {
-        WatcherModeManager manager = new WatcherModeManager();
-        DummyWatcher watcher = new DummyWatcher();
-        manager.setWatcherMode(watcher, "/a", WatcherMode.DEFAULT_WATCHER_MODE);
-        assertEquals(0, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE);
-        assertEquals(1, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE);
-        assertEquals(2, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE);
-        assertEquals(2, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT);
-        assertEquals(1, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE);
-        assertEquals(2, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a/b", WatcherMode.DEFAULT_WATCHER_MODE);
-        assertEquals(1, manager.getRecursiveQty());
-        manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT);
-        assertEquals(0, manager.getRecursiveQty());
-    }
-
     @Test
     public void testAddRemove() {
         Watcher watcher1 = new DummyWatcher();
@@ -125,7 +103,7 @@ public class RecursiveWatchQtyTest {
     }
 
     @Test
-    public void testChangeType() {
+    public void testDifferentWatchModes() {
         Watcher watcher = new DummyWatcher();
 
         watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT);
@@ -133,15 +111,14 @@ public class RecursiveWatchQtyTest {
         watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
         assertEquals(1, watchManager.getRecursiveWatchQty());
         watchManager.addWatch("/a", watcher, WatcherMode.STANDARD);
-        assertEquals(0, watchManager.getRecursiveWatchQty());
+        assertEquals(1, watchManager.getRecursiveWatchQty());
         assertTrue(watchManager.removeWatcher("/a", watcher));
         assertEquals(0, watchManager.getRecursiveWatchQty());
     }
 
     @Test
-    public void testRecursiveQtyConcurrency() {
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-        WatcherModeManager manager = new WatcherModeManager();
+    public void testRecursiveQtyConcurrency() throws Exception {
+        WatchManager manager = new WatchManager();
         ExecutorService threadPool = Executors.newFixedThreadPool(clientQty);
         List<Future<?>> tasks = null;
         CountDownLatch completedLatch = new CountDownLatch(clientQty);
@@ -149,11 +126,7 @@ public class RecursiveWatchQtyTest {
             tasks = IntStream.range(0, clientQty)
                     .mapToObj(__ -> threadPool.submit(() -> iterate(manager, completedLatch)))
                     .collect(Collectors.toList());
-            try {
-                completedLatch.await();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
+            completedLatch.await();
         } finally {
             if (tasks != null) {
                 tasks.forEach(t -> t.cancel(true));
@@ -161,14 +134,15 @@ public class RecursiveWatchQtyTest {
             threadPool.shutdownNow();
         }
 
-        int expectedRecursiveQty = (int) manager.getWatcherModes().values()
+        int expectedRecursiveQty = (int) manager.getWatch2Paths().values()
                 .stream()
-                .filter(mode -> mode == WatcherMode.PERSISTENT_RECURSIVE)
+                .flatMap(paths -> paths.values().stream())
+                .filter(stats -> stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE))
                 .count();
-        assertEquals(expectedRecursiveQty, manager.getRecursiveQty());
+        assertEquals(expectedRecursiveQty, manager.getRecursiveWatchQty());
     }
 
-    private void iterate(WatcherModeManager manager, CountDownLatch completedLatch) {
+    private void iterate(WatchManager manager, CountDownLatch completedLatch) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         try {
             for (int i = 0; i < iterations; ++i) {
@@ -176,9 +150,9 @@ public class RecursiveWatchQtyTest {
                 boolean doSet = random.nextInt(100) > 33;    // 2/3 will be sets
                 if (doSet) {
                     WatcherMode mode = WatcherMode.values()[random.nextInt(WatcherMode.values().length)];
-                    manager.setWatcherMode(new DummyWatcher(), path, mode);
+                    manager.addWatch(path, new DummyWatcher(), mode);
                 } else {
-                    manager.removeWatcher(new DummyWatcher(), path);
+                    manager.removeWatcher(path, new DummyWatcher());
                 }
 
                 int sleepMillis = random.nextInt(2);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
index e74ee2fd6..077af3c45 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.test;
 
+import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
 import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -171,6 +172,53 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
         }
     }
 
+    @Test
+    public void testSamePathWithDifferentWatchModes() throws Exception {
+        try (ZooKeeper zk = createClient()) {
+            BlockingQueue<WatchedEvent> dataEvents = new LinkedBlockingQueue<>();
+            BlockingQueue<WatchedEvent> childEvents = new LinkedBlockingQueue<>();
+            BlockingQueue<WatchedEvent> persistentEvents = new LinkedBlockingQueue<>();
+            BlockingQueue<WatchedEvent> recursiveEvents = new LinkedBlockingQueue<>();
+
+            zk.addWatch("/a", persistentEvents::add, PERSISTENT);
+            zk.addWatch("/a", recursiveEvents::add, PERSISTENT_RECURSIVE);
+            zk.exists("/a", dataEvents::add);
+
+            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(dataEvents, Watcher.Event.EventType.NodeCreated, "/a");
+            assertEvent(persistentEvents, Watcher.Event.EventType.NodeCreated, "/a");
+            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a");
+
+            zk.getData("/a", dataEvents::add, null);
+            zk.setData("/a", new byte[0], -1);
+            assertEvent(dataEvents, Watcher.Event.EventType.NodeDataChanged, "/a");
+            assertEvent(persistentEvents, Watcher.Event.EventType.NodeDataChanged, "/a");
+            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDataChanged, "/a");
+
+            zk.getChildren("/a", childEvents::add);
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+            assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a/b");
+
+            zk.getChildren("/a", childEvents::add);
+            zk.delete("/a/b", -1);
+            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+            assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a/b");
+
+            zk.getChildren("/a", childEvents::add);
+            zk.getData("/a", dataEvents::add, null);
+            zk.exists("/a", dataEvents::add);
+            zk.delete("/a", -1);
+            assertEvent(childEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+            assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+            assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+            assertEvent(persistentEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+            assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a");
+        }
+    }
+
     @Test
     public void testRootWatcher()
             throws IOException, InterruptedException, KeeperException {