You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/09/28 21:38:34 UTC

[2/2] zookeeper git commit: ZOOKEEPER-1177: Add the memory optimized watch manager for concentrate watches scenario

ZOOKEEPER-1177: Add the memory optimized watch manager for concentrate watches scenario

The current HashSet based WatcherManager will consume more than 40GB memory when
creating 300M watches.

This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result.

Changed made compared to WatchManager:
* Only keep path to watches map
* Use BitSet to save the memory used to store watches
* Use ConcurrentHashMap and ReadWriteLock instead of synchronized to reduce lock retention
* Lazily clean up the closed watchers

Author: Fangmin Lyu <al...@fb.com>

Reviewers: Andor Molnár <an...@apache.org>, Norbert Kalmar <nk...@yahoo.com>, Michael Han <ha...@apache.org>

Closes #590 from lvfangmin/ZOOKEEPER-1177


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/fdde8b00
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/fdde8b00
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/fdde8b00

Branch: refs/heads/master
Commit: fdde8b006458f7b989c894af0eac7e124d271a1e
Parents: 4ebb847
Author: Fangmin Lyu <al...@fb.com>
Authored: Fri Sep 28 14:38:24 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Fri Sep 28 14:38:24 2018 -0700

----------------------------------------------------------------------
 build.xml                                       |  12 +-
 ivy.xml                                         |   3 +
 .../org/apache/zookeeper/server/DataTree.java   |  25 +-
 .../apache/zookeeper/server/DumbWatcher.java    | 100 +++++
 .../apache/zookeeper/server/NIOServerCnxn.java  |   1 +
 .../zookeeper/server/NettyServerCnxn.java       |   1 +
 .../org/apache/zookeeper/server/ServerCnxn.java |  16 +-
 .../apache/zookeeper/server/WatchManager.java   | 266 ------------
 .../zookeeper/server/WatchesPathReport.java     |  83 ----
 .../apache/zookeeper/server/WatchesReport.java  |  83 ----
 .../apache/zookeeper/server/WatchesSummary.java |  98 -----
 .../org/apache/zookeeper/server/ZKDatabase.java |   1 +
 .../zookeeper/server/util/BitHashSet.java       | 159 ++++++++
 .../apache/zookeeper/server/util/BitMap.java    | 136 +++++++
 .../server/watch/IDeadWatcherListener.java      |  34 ++
 .../zookeeper/server/watch/IWatchManager.java   | 134 ++++++
 .../zookeeper/server/watch/WatchManager.java    | 247 ++++++++++++
 .../server/watch/WatchManagerFactory.java       |  52 +++
 .../server/watch/WatchManagerOptimized.java     | 389 ++++++++++++++++++
 .../zookeeper/server/watch/WatcherCleaner.java  | 182 +++++++++
 .../zookeeper/server/watch/WatcherOrBitSet.java |  61 +++
 .../server/watch/WatchesPathReport.java         |  83 ++++
 .../zookeeper/server/watch/WatchesReport.java   |  83 ++++
 .../zookeeper/server/watch/WatchesSummary.java  |  98 +++++
 src/java/test/config/findbugsExcludeFile.xml    |  38 +-
 .../zookeeper/server/WatchesPathReportTest.java |  60 ---
 .../zookeeper/server/WatchesReportTest.java     |  60 ---
 .../zookeeper/server/WatchesSummaryTest.java    |  42 --
 .../zookeeper/server/util/BitHashSetTest.java   | 110 +++++
 .../zookeeper/server/util/BitMapTest.java       |  71 ++++
 .../server/watch/WatchManagerTest.java          | 404 +++++++++++++++++++
 .../server/watch/WatcherCleanerTest.java        | 127 ++++++
 .../server/watch/WatcherOrBitSetTest.java       |  61 +++
 .../server/watch/WatchesPathReportTest.java     |  60 +++
 .../server/watch/WatchesReportTest.java         |  60 +++
 .../server/watch/WatchesSummaryTest.java        |  42 ++
 .../bench/org/apache/zookeeper/BenchMain.java   |  30 ++
 .../zookeeper/server/watch/WatchBench.java      | 300 ++++++++++++++
 .../src/main/resources/mainClasses              |   1 +
 .../content/xdocs/zookeeperAdmin.xml            |  96 +++++
 40 files changed, 3207 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 2043e68..6e3b40c 100644
--- a/build.xml
+++ b/build.xml
@@ -91,6 +91,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="src.dir" value="${basedir}/src" />
     <property name="java.src.dir" value="${src.dir}/java/main" />
     <property name="jute.src.dir" value="${basedir}/zookeeper-jute/src/main/java" />
+    <property name="java.test.dir" value="${src.dir}/test/java"/>
 
     <property name="lib.dir" value="${src.dir}/java/lib" />
     <property name="lib.dir.includes" value="**/*.jar" />
@@ -121,6 +122,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="test.src.dir" value="${src.dir}/java/test"/>
     <property name="jute.test.src.dir" value="${basedir}/zookeeper-jute/src/test/java" />
     <property name="systest.src.dir" value="${src.dir}/java/systest"/>
+    <property name="bench.src.dir" value="${java.test.dir}/bench"/>
     <property name="test.log.dir" value="${test.java.build.dir}/logs" />
     <property name="test.data.dir" value="${test.java.build.dir}/data" />
     <property name="test.data.invalid.dir" value="${test.data.dir}/invalidsnap" />
@@ -234,6 +236,8 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
 
     <property name="hamcrest.version" value="1.3"/>
 
+    <property name="jmh.version" value="1.19"/>
+
     <!-- ====================================================== -->
     <!-- Macro definitions                                      -->
     <!-- ====================================================== -->
@@ -510,6 +514,10 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
              target="${javac.target}" source="${javac.source}" debug="on" encoding="${build.encoding}">
         <classpath refid="test.java.classpath"/>
       </javac>
+      <javac srcdir="${bench.src.dir}" destdir="${test.java.classes}" includeantruntime="false"
+             target="${javac.target}" source="${javac.source}" debug="on" encoding="${build.encoding}">
+          <classpath refid="test.java.classpath"/>
+      </javac>
     </target>
 
     <target name="compile-native" depends="compile_jute" description="Make C binding">
@@ -1967,7 +1975,9 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
                    output="${build.dir.eclipse-test-classes}" />
            <source path="${systest.src.dir}"
                    output="${build.dir.eclipse-test-classes}" />
-
+           <source path="${bench.src.dir}"
+                   output="${build.dir.eclipse-test-classes}" />
+                   
            <output path="${build.dir.eclipse-main-classes}" />
            <library pathref="default.path.id" exported="true" />
            <library pathref="junit.path.id" exported="false" />

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 663216e..fb09ed3 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -142,6 +142,9 @@
     <dependency org="org.hamcrest" name="hamcrest-all" rev="${hamcrest.version}"
                   conf="test->default" />
 
+    <dependency org="org.openjdk.jmh" name="jmh-core" rev="${jmh.version}" conf="test->default"/>
+    <dependency org="org.openjdk.jmh" name="jmh-generator-annprocess" rev="${jmh.version}" conf="test->default"/>
+
     <conflict manager="strict"/>
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/DataTree.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java
index ac91c10..c24396a 100644
--- a/src/java/main/org/apache/zookeeper/server/DataTree.java
+++ b/src/java/main/org/apache/zookeeper/server/DataTree.java
@@ -39,6 +39,12 @@ import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherOrBitSet;
+import org.apache.zookeeper.server.watch.WatchesPathReport;
+import org.apache.zookeeper.server.watch.WatchesReport;
+import org.apache.zookeeper.server.watch.WatchesSummary;
 import org.apache.zookeeper.txn.CheckVersionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -87,9 +93,9 @@ public class DataTree {
     private final ConcurrentHashMap<String, DataNode> nodes =
         new ConcurrentHashMap<String, DataNode>();
 
-    private final WatchManager dataWatches = new WatchManager();
+    private IWatchManager dataWatches;
 
-    private final WatchManager childWatches = new WatchManager();
+    private IWatchManager childWatches;
 
     /** cached total size of paths and data for all DataNodes */
     private final AtomicLong nodeDataSize = new AtomicLong(0);
@@ -253,6 +259,14 @@ public class DataTree {
         addConfigNode();
 
         nodeDataSize.set(approximateDataSize());
+        try {
+            dataWatches = WatchManagerFactory.createWatchManager();
+            childWatches = WatchManagerFactory.createWatchManager();
+        } catch (Exception e) {
+            LOG.error("Unexpected exception when creating WatchManager, " +
+                    "exiting abnormally", e);
+            System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
     }
 
     /**
@@ -611,7 +625,7 @@ public class DataTree {
             ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                     "childWatches.triggerWatch " + parentName);
         }
-        Set<Watcher> processed = dataWatches.triggerWatch(path,
+        WatcherOrBitSet processed = dataWatches.triggerWatch(path,
                 EventType.NodeDeleted);
         childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
         childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
@@ -1361,6 +1375,11 @@ public class DataTree {
         }
     }
 
+    public void shutdownWatcher() {
+        dataWatches.shutdown();
+        childWatches.shutdown();
+    }
+
     /**
      * Returns a mapping of session ID to ephemeral znodes.
      *

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/DumbWatcher.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/DumbWatcher.java b/src/java/main/org/apache/zookeeper/server/DumbWatcher.java
new file mode 100644
index 0000000..ff17181
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/DumbWatcher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.cert.Certificate;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+
+/**
+ * A empty watcher implementation used in bench and unit test.
+ */
+public class DumbWatcher extends ServerCnxn {
+
+    private long sessionId;
+
+    public DumbWatcher() {
+        this(0);
+    }
+
+    public DumbWatcher(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    void setSessionTimeout(int sessionTimeout) { }
+
+    @Override
+    public void process(WatchedEvent event) { }
+
+    @Override
+    int getSessionTimeout() { return 0; }
+
+    @Override
+    void close() { }
+
+    @Override
+    public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { }
+
+    @Override
+    public void sendCloseSession() { }
+
+    @Override
+    public long getSessionId() { return sessionId; }
+
+    @Override
+    void setSessionId(long sessionId) { }
+
+    @Override
+    void sendBuffer(ByteBuffer closeConn) { }
+
+    @Override
+    void enableRecv() { }
+
+    @Override
+    void disableRecv() { }
+
+    @Override
+    protected ServerStats serverStats() { return null; }
+
+    @Override
+    public long getOutstandingRequests() { return 0; }
+
+    @Override
+    public InetSocketAddress getRemoteSocketAddress() { return null; }
+
+    @Override
+    public int getInterestOps() { return 0; }
+
+    @Override
+    public boolean isSecure() { return false; }
+
+    @Override
+    public Certificate[] getClientCertificateChain() { return null; }
+
+    @Override
+    public void setClientCertificateChain(Certificate[] chain) { }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
index fffb775..c344c65 100644
--- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -594,6 +594,7 @@ public class NIOServerCnxn extends ServerCnxn {
      */
     @Override
     public void close() {
+        setStale();
         if (!factory.removeCnxn(this)) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 88aa593..0b27724 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -92,6 +92,7 @@ public class NettyServerCnxn extends ServerCnxn {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
         }
+        setStale();
 
         // ZOOKEEPER-2743:
         // Always unregister connection upon close to prevent

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
index 917516a..0822f19 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
@@ -54,7 +54,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     // (aka owned by) this class
     final public static Object me = new Object();
     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class);
-    
+
     private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());
 
     private static final byte[] fourBytes = new byte[4];
@@ -66,6 +66,8 @@ public abstract class ServerCnxn implements Stats, Watcher {
      */
     boolean isOldClient = true;
 
+    private volatile boolean stale = false;
+
     abstract int getSessionTimeout();
 
     abstract void close();
@@ -143,6 +145,14 @@ public abstract class ServerCnxn implements Stats, Watcher {
         }
     }
 
+    public boolean isStale() {
+        return stale;
+    }
+
+    public void setStale() {
+        stale = true;
+    }
+
     protected void packetReceived(long bytes) {
         incrPacketsReceived();
         ServerStats serverStats = serverStats();
@@ -196,7 +206,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     protected long incrPacketsReceived() {
         return packetsReceived.incrementAndGet();
     }
-    
+
     protected void incrOutstandingRequests(RequestHeader h) {
     }
 
@@ -293,7 +303,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     public abstract boolean isSecure();
     public abstract Certificate[] getClientCertificateChain();
     public abstract void setClientCertificateChain(Certificate[] chain);
-    
+
     /**
      * Print information about the connection.
      * @param brief iff true prints brief details, otw full detail

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchManager.java b/src/java/main/org/apache/zookeeper/server/WatchManager.java
deleted file mode 100644
index 076f645..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchManager.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class manages watches. It allows watches to be associated with a string
- * and removes watchers and their watches in addition to managing triggers.
- */
-class WatchManager {
-    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
-
-    private final Map<String, Set<Watcher>> watchTable =
-        new HashMap<String, Set<Watcher>>();
-
-    private final Map<Watcher, Set<String>> watch2Paths =
-        new HashMap<Watcher, Set<String>>();
-
-    synchronized int size(){
-        int result = 0;
-        for(Set<Watcher> watches : watchTable.values()) {
-            result += watches.size();
-        }
-        return result;
-    }
-
-    synchronized void addWatch(String path, Watcher watcher) {
-        Set<Watcher> list = watchTable.get(path);
-        if (list == null) {
-            // don't waste memory if there are few watches on a node
-            // rehash when the 4th entry is added, doubling size thereafter
-            // seems like a good compromise
-            list = new HashSet<Watcher>(4);
-            watchTable.put(path, list);
-        }
-        list.add(watcher);
-
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null) {
-            // cnxns typically have many watches, so use default cap here
-            paths = new HashSet<String>();
-            watch2Paths.put(watcher, paths);
-        }
-        paths.add(path);
-    }
-
-    synchronized void removeWatcher(Watcher watcher) {
-        Set<String> paths = watch2Paths.remove(watcher);
-        if (paths == null) {
-            return;
-        }
-        for (String p : paths) {
-            Set<Watcher> list = watchTable.get(p);
-            if (list != null) {
-                list.remove(watcher);
-                if (list.size() == 0) {
-                    watchTable.remove(p);
-                }
-            }
-        }
-    }
-
-    Set<Watcher> triggerWatch(String path, EventType type) {
-        return triggerWatch(path, type, null);
-    }
-
-    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
-        WatchedEvent e = new WatchedEvent(type,
-                KeeperState.SyncConnected, path);
-        Set<Watcher> watchers;
-        synchronized (this) {
-            watchers = watchTable.remove(path);
-            if (watchers == null || watchers.isEmpty()) {
-                if (LOG.isTraceEnabled()) {
-                    ZooTrace.logTraceMessage(LOG,
-                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
-                            "No watchers for " + path);
-                }
-                return null;
-            }
-            for (Watcher w : watchers) {
-                Set<String> paths = watch2Paths.get(w);
-                if (paths != null) {
-                    paths.remove(path);
-                }
-            }
-        }
-        for (Watcher w : watchers) {
-            if (supress != null && supress.contains(w)) {
-                continue;
-            }
-            w.process(e);
-        }
-        return watchers;
-    }
-
-    /**
-     * Brief description of this object.
-     */
-    @Override
-    public synchronized String toString() {
-        StringBuilder sb = new StringBuilder();
-
-        sb.append(watch2Paths.size()).append(" connections watching ")
-            .append(watchTable.size()).append(" paths\n");
-
-        int total = 0;
-        for (Set<String> paths : watch2Paths.values()) {
-            total += paths.size();
-        }
-        sb.append("Total watches:").append(total);
-
-        return sb.toString();
-    }
-
-    /**
-     * String representation of watches. Warning, may be large!
-     * @param byPath iff true output watches by paths, otw output
-     * watches by connection
-     * @return string representation of watches
-     */
-    synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
-        if (byPath) {
-            for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
-                pwriter.println(e.getKey());
-                for (Watcher w : e.getValue()) {
-                    pwriter.print("\t0x");
-                    pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
-                    pwriter.print("\n");
-                }
-            }
-        } else {
-            for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
-                pwriter.print("0x");
-                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
-                for (String path : e.getValue()) {
-                    pwriter.print("\t");
-                    pwriter.println(path);
-                }
-            }
-        }
-    }
-
-    /**
-     * Checks the specified watcher exists for the given path
-     *
-     * @param path
-     *            znode path
-     * @param watcher
-     *            watcher object reference
-     * @return true if the watcher exists, false otherwise
-     */
-    synchronized boolean containsWatcher(String path, Watcher watcher) {
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null || !paths.contains(path)) {
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * Removes the specified watcher for the given path
-     *
-     * @param path
-     *            znode path
-     * @param watcher
-     *            watcher object reference
-     * @return true if the watcher successfully removed, false otherwise
-     */
-    synchronized boolean removeWatcher(String path, Watcher watcher) {
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null || !paths.remove(path)) {
-            return false;
-        }
-
-        Set<Watcher> list = watchTable.get(path);
-        if (list == null || !list.remove(watcher)) {
-            return false;
-        }
-
-        if (list.size() == 0) {
-            watchTable.remove(path);
-        }
-
-        return true;
-    }
-
-    /**
-     * Returns a watch report.
-     *
-     * @return watch report
-     * @see WatchesReport
-     */
-    synchronized WatchesReport getWatches() {
-        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
-        for (Entry<Watcher, Set<String>> e: watch2Paths.entrySet()) {
-            Long id = ((ServerCnxn) e.getKey()).getSessionId();
-            Set<String> paths = new HashSet<String>(e.getValue());
-            id2paths.put(id, paths);
-        }
-        return new WatchesReport(id2paths);
-    }
-
-    /**
-     * Returns a watch report by path.
-     *
-     * @return watch report
-     * @see WatchesPathReport
-     */
-    synchronized WatchesPathReport getWatchesByPath() {
-        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
-        for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
-            Set<Long> ids = new HashSet<Long>(e.getValue().size());
-            path2ids.put(e.getKey(), ids);
-            for (Watcher watcher : e.getValue()) {
-                ids.add(((ServerCnxn) watcher).getSessionId());
-            }
-        }
-        return new WatchesPathReport(path2ids);
-    }
-
-    /**
-     * Returns a watch summary.
-     *
-     * @return watch summary
-     * @see WatchesSummary
-     */
-    synchronized WatchesSummary getWatchesSummary() {
-        int totalWatches = 0;
-        for (Set<String> paths : watch2Paths.values()) {
-            totalWatches += paths.size();
-        }
-        return new WatchesSummary (watch2Paths.size(), watchTable.size(),
-                                   totalWatches);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java b/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java
deleted file mode 100644
index 6792ac9..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A watch report, essentially a mapping of path to session IDs of sessions that
- * have set a watch on that path. This class is immutable.
- */
-public class WatchesPathReport {
-
-    private final Map<String, Set<Long>> path2Ids;
-
-    /**
-     * Creates a new report.
-     *
-     * @param path2Ids map of paths to session IDs of sessions that have set a
-     * watch on that path
-     */
-    WatchesPathReport(Map<String, Set<Long>> path2Ids) {
-        this.path2Ids = Collections.unmodifiableMap(deepCopy(path2Ids));
-    }
-
-    private static Map<String, Set<Long>> deepCopy(Map<String, Set<Long>> m) {
-        Map<String, Set<Long>> m2 = new HashMap<String, Set<Long>>();
-        for (Map.Entry<String, Set<Long>> e : m.entrySet()) {
-            m2.put(e.getKey(), new HashSet<Long>(e.getValue()));
-        }
-        return m2;
-    }
-
-    /**
-     * Checks if the given path has watches set.
-     *
-     * @param path path
-     * @return true if path has watch set
-     */
-    public boolean hasSessions(String path) {
-        return path2Ids.containsKey(path);
-    }
-    /**
-     * Gets the session IDs of sessions that have set watches on the given path.
-     * The returned set is immutable.
-     *
-     * @param path session ID
-     * @return session IDs of sessions that have set watches on the path, or
-     * null if none
-     */
-    public Set<Long> getSessions(String path) {
-        Set<Long> s = path2Ids.get(path);
-        return s != null ? Collections.unmodifiableSet(s) : null;
-    }
-
-    /**
-     * Converts this report to a map. The returned map is mutable, and changes
-     * to it do not reflect back into this report.
-     *
-     * @return map representation of report
-     */
-    public Map<String, Set<Long>> toMap() {
-        return deepCopy(path2Ids);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchesReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchesReport.java b/src/java/main/org/apache/zookeeper/server/WatchesReport.java
deleted file mode 100644
index e4c6dc2..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchesReport.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A watch report, essentially a mapping of session ID to paths that the session
- * has set a watch on. This class is immutable.
- */
-public class WatchesReport {
-
-    private final Map<Long, Set<String>> id2paths;
-
-    /**
-     * Creates a new report.
-     *
-     * @param id2paths map of session IDs to paths that each session has set
-     * a watch on
-     */
-    WatchesReport(Map<Long, Set<String>> id2paths) {
-        this.id2paths = Collections.unmodifiableMap(deepCopy(id2paths));
-    }
-
-    private static Map<Long, Set<String>> deepCopy(Map<Long, Set<String>> m) {
-        Map<Long, Set<String>> m2 = new HashMap<Long, Set<String>>();
-        for (Map.Entry<Long, Set<String>> e : m.entrySet()) {
-            m2.put(e.getKey(), new HashSet<String>(e.getValue()));
-        }
-        return m2;
-    }
-
-    /**
-     * Checks if the given session has watches set.
-     *
-     * @param sessionId session ID
-     * @return true if session has paths with watches set
-     */
-    public boolean hasPaths(long sessionId) {
-        return id2paths.containsKey(sessionId);
-    }
-
-    /**
-     * Gets the paths that the given session has set watches on. The returned
-     * set is immutable.
-     *
-     * @param sessionId session ID
-     * @return paths that have watches set by the session, or null if none
-     */
-    public Set<String> getPaths(long sessionId) {
-        Set<String> s = id2paths.get(sessionId);
-        return s != null ? Collections.unmodifiableSet(s) : null;
-    }
-
-    /**
-     * Converts this report to a map. The returned map is mutable, and changes
-     * to it do not reflect back into this report.
-     *
-     * @return map representation of report
-     */
-    public Map<Long, Set<String>> toMap() {
-        return deepCopy(id2paths);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchesSummary.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchesSummary.java b/src/java/main/org/apache/zookeeper/server/WatchesSummary.java
deleted file mode 100644
index 2053b55..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchesSummary.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A summary of watch information. This class is immutable.
- */
-public class WatchesSummary {
-
-    /**
-     * The key in the map returned by {@link #toMap()} for the number of
-     * connections.
-     */
-    public static final String KEY_NUM_CONNECTIONS = "num_connections";
-    /**
-     * The key in the map returned by {@link #toMap()} for the number of paths.
-     */
-    public static final String KEY_NUM_PATHS = "num_paths";
-    /**
-     * The key in the map returned by {@link #toMap()} for the total number of
-     * watches.
-     */
-    public static final String KEY_NUM_TOTAL_WATCHES = "num_total_watches";
-
-    private final int numConnections;
-    private final int numPaths;
-    private final int totalWatches;
-
-    /**
-     * Creates a new summary.
-     *
-     * @param numConnections the number of sessions that have set watches
-     * @param numPaths the number of paths that have watches set on them
-     * @param totalWatches the total number of watches set
-     */
-    WatchesSummary(int numConnections, int numPaths, int totalWatches) {
-        this.numConnections = numConnections;
-        this.numPaths = numPaths;
-        this.totalWatches = totalWatches;
-    }
-
-    /**
-     * Gets the number of connections (sessions) that have set watches.
-     *
-     * @return number of connections
-     */
-    public int getNumConnections() {
-        return numConnections;
-    }
-    /**
-     * Gets the number of paths that have watches set on them.
-     *
-     * @return number of paths
-     */
-    public int getNumPaths() {
-        return numPaths;
-    }
-    /**
-     * Gets the total number of watches set.
-     *
-     * @return total watches
-     */
-    public int getTotalWatches() {
-        return totalWatches;
-    }
-
-    /**
-     * Converts this summary to a map. The returned map is mutable, and changes
-     * to it do not reflect back into this summary.
-     *
-     * @return map representation of summary
-     */
-    public Map<String, Object> toMap() {
-        Map<String, Object> summary = new LinkedHashMap<String, Object>();
-        summary.put(KEY_NUM_CONNECTIONS, numConnections);
-        summary.put(KEY_NUM_PATHS, numPaths);
-        summary.put(KEY_NUM_TOTAL_WATCHES, totalWatches);
-        return summary;
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 86e2c09..753461a 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -138,6 +138,7 @@ public class ZKDatabase {
         /* to be safe we just create a new
          * datatree.
          */
+        dataTree.shutdownWatcher();
         dataTree = createDataTree();
         sessionsWithTimeouts.clear();
         WriteLock lock = logLock.writeLock();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java b/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
new file mode 100644
index 0000000..b60f1d4
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.BitSet;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.lang.Iterable;
+
+/**
+ * Using BitSet to store all the elements, and use HashSet to cache limited
+ * number of elements to find a balance between memory and time complexity.
+ *
+ * Without HashSet, we need to use O(N) time to get the elements, N is
+ * the bit numbers in elementBits. But we need to keep the size small to make
+ * sure it doesn't cost too much in memory, there is a trade off between
+ * memory and time complexity.
+ *
+ * Previously, was deciding to dynamically switch between SparseBitSet and
+ * HashSet based on the memory consumption, but it will take time to copy
+ * data over and may have some herd effect of keep copying data from one
+ * data structure to anther. The current solution can do a very good job
+ * given most of the paths have limited number of elements.
+ */
+public class BitHashSet implements Iterable<Integer> {
+
+    /**
+     * Change to SparseBitSet if we we want to optimize more, the number of
+     * elements on a single server is usually limited, so BitSet should be
+     * fine.
+     */
+    private final BitSet elementBits = new BitSet();
+
+    /**
+     * HashSet is used to optimize the iterating, if there is a single 
+     * element in this BitHashSet, but the bit is very large, without 
+     * HashSet we need to go through all the words before return that 
+     * element, which is not efficient.
+     */
+    private final Set<Integer> cache = new HashSet<Integer>();
+
+    private final int cacheSize;
+
+    // To record how many elements in this set.
+    private int elementCount = 0;
+
+    public BitHashSet() {
+        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
+    }
+
+    public BitHashSet(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public synchronized boolean add(Integer elementBit) {
+        if (elementBit == null || elementBits.get(elementBit)) {
+            return false;
+        }
+        if (cache.size() < cacheSize) {
+            cache.add(elementBit);
+        }
+        elementBits.set(elementBit);
+        elementCount++;
+        return true;
+    }
+
+    /**
+     * Remove the watches, and return the number of watches being removed.
+     */
+    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
+        cache.removeAll(bitSet);
+        elementBits.andNot(bits);
+        int elementCountBefore = elementCount;
+        elementCount = elementBits.cardinality();
+        return elementCountBefore - elementCount;
+    }
+
+    public synchronized boolean remove(Integer elementBit) {
+        if (elementBit == null || !elementBits.get(elementBit)) {
+            return false;
+        }
+
+        cache.remove(elementBit);
+        elementBits.clear(elementBit);
+        elementCount--;
+        return true;
+    }
+
+    public synchronized boolean contains(Integer elementBit) {
+        if (elementBit == null) {
+            return false;
+        }
+        return elementBits.get(elementBit);
+    }
+
+    public synchronized int size() {
+        return elementCount;
+    }
+
+    /**
+     * This function is not thread-safe, need to synchronized when
+     * iterate through this set.
+     */
+    @Override
+    public Iterator<Integer> iterator() {
+        if (cache.size() == elementCount) {
+            return cache.iterator();
+        }
+
+        return new Iterator<Integer>() {
+            int returnedCount = 0;
+            int bitIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return returnedCount < elementCount;
+            }
+
+            @Override
+            public Integer next() {
+                int bit = elementBits.nextSetBit(bitIndex);
+                bitIndex = bit + 1;
+                returnedCount++;
+                return bit;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    // visible for test
+    public synchronized int cachedSize() {
+        return cache.size();
+    }
+
+    public synchronized boolean isEmpty() {
+        return elementCount == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/util/BitMap.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/util/BitMap.java b/src/java/main/org/apache/zookeeper/server/util/BitMap.java
new file mode 100644
index 0000000..1a0fb3b
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/util/BitMap.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is a helper class to maintain the bit to specific value and the
+ * reversed value to bit mapping.
+ */
+public class BitMap<T> {
+
+    private final Map<T, Integer> value2Bit = new HashMap<T, Integer>();
+    private final Map<Integer, T> bit2Value = new HashMap<Integer, T>();
+
+    private final BitSet freedBitSet = new BitSet();
+    private Integer nextBit = Integer.valueOf(0);
+
+    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    public Integer add(T value) {
+        /*
+         * Optimized for code which will add the same value again and again,
+         * more specifically this is used to add new bit for watcher, and
+         * the same watcher may watching thousands or even millions of nodes,
+         * which will call add the same value of this function, check exist
+         * using read lock will optimize the performance here.
+         */
+        Integer bit = getBit(value);
+        if (bit != null) {
+            return bit;
+        }
+
+        rwLock.writeLock().lock();
+        try {
+            bit = value2Bit.get(value);
+            if (bit != null) {
+                return bit;
+            }
+            bit = freedBitSet.nextSetBit(0);
+            if (bit > -1) {
+                freedBitSet.clear(bit);
+            } else {
+                bit = nextBit++;
+            }
+
+            value2Bit.put(value, bit);
+            bit2Value.put(bit, value);
+            return bit;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public T get(int bit) {
+        rwLock.readLock().lock();
+        try {
+            return bit2Value.get(bit);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public Integer getBit(T value) {
+        rwLock.readLock().lock();
+        try {
+            return value2Bit.get(value);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public int remove(T value) {
+        /*
+         * remove only called once when the session is closed, so use write 
+         * lock directly without checking read lock.
+         */
+        rwLock.writeLock().lock();
+        try {
+            Integer bit = value2Bit.get(value);
+            if (bit == null) {
+                return -1;
+            }
+            value2Bit.remove(value);
+            bit2Value.remove(bit);
+            freedBitSet.set(bit);
+            return bit;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public T remove(int bit) {
+        rwLock.writeLock().lock();
+        try {
+            T value = bit2Value.get(bit);
+            if (value == null) {
+                return null;
+            }
+            value2Bit.remove(value);
+            bit2Value.remove(bit);
+            freedBitSet.set(bit);
+            return value;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public int size() {
+        rwLock.readLock().lock();
+        try {
+            return value2Bit.size();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java b/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
new file mode 100644
index 0000000..7de6772
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+
+/**
+ * Interface used to process the dead watchers related to closed cnxns.
+ */
+public interface IDeadWatcherListener {
+
+    /**
+     * Process the given dead watchers.
+     *
+     * @param deadWatchers the watchers which have closed cnxn
+     */
+    public void processDeadWatchers(Set<Integer> deadWatchers);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java b/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
new file mode 100644
index 0000000..0c18e6a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public interface IWatchManager {
+
+    /**
+     * Add watch to specific path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher added is not already present
+     */
+    public boolean addWatch(String path, Watcher watcher);
+
+    /**
+     * Checks the specified watcher exists for the given path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher exists, false otherwise
+     */
+    public boolean containsWatcher(String path, Watcher watcher);
+
+    /**
+     * Removes the specified watcher for the given path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher successfully removed, false otherwise
+     */
+    public boolean removeWatcher(String path, Watcher watcher);
+
+    /**
+     * The entry to remove the watcher when the cnxn is closed.
+     *
+     * @param watcher watcher object reference
+     */
+    public void removeWatcher(Watcher watcher);
+
+    /**
+     * Distribute the watch event for the given path.
+     *
+     * @param path znode path
+     * @param type the watch event type
+     *
+     * @return the watchers have been notified
+     */
+    public WatcherOrBitSet triggerWatch(String path, EventType type);
+
+    /**
+     * Distribute the watch event for the given path, but ignore those
+     * suppressed ones.
+     *
+     * @param path znode path
+     * @param type the watch event type
+     * @param suppress the suppressed watcher set
+     *
+     * @return the watchers have been notified
+     */
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet suppress);
+
+    /**
+     * Get the size of watchers.
+     *
+     * @return the watchers number managed in this class.
+     */
+    public int size();
+
+    /**
+     * Clean up the watch manager.
+     */
+    public void shutdown();
+
+    /**
+     * Returns a watch summary.
+     *
+     * @return watch summary
+     * @see WatchesSummary
+     */
+    public WatchesSummary getWatchesSummary();
+
+    /**
+     * Returns a watch report.
+     *
+     * @return watch report
+     * @see WatchesReport
+     */
+    public WatchesReport getWatches();
+
+    /**
+     * Returns a watch report by path.
+     *
+     * @return watch report
+     * @see WatchesPathReport
+     */
+    public WatchesPathReport getWatchesByPath();
+
+    /**
+     * String representation of watches. Warning, may be large!
+     *
+     * @param pwriter the writer to dump the watches
+     * @param byPath iff true output watches by paths, otw output
+     * watches by connection
+     *
+     * @return string representation of watches
+     */
+    public void dumpWatches(PrintWriter pwriter, boolean byPath);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java b/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
new file mode 100644
index 0000000..3e14f6e
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooTrace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages watches. It allows watches to be associated with a string
+ * and removes watchers and their watches in addition to managing triggers.
+ */
+public class WatchManager implements IWatchManager {
+    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
+
+    private final Map<String, Set<Watcher>> watchTable =
+        new HashMap<String, Set<Watcher>>();
+
+    private final Map<Watcher, Set<String>> watch2Paths =
+        new HashMap<Watcher, Set<String>>();
+
+    @Override
+    public synchronized int size(){
+        int result = 0;
+        for(Set<Watcher> watches : watchTable.values()) {
+            result += watches.size();
+        }
+        return result;
+    }
+
+    boolean isDeadWatcher(Watcher watcher) {
+        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
+    }
+
+    @Override
+    public synchronized boolean addWatch(String path, Watcher watcher) {
+        if (isDeadWatcher(watcher)) {
+            LOG.debug("Ignoring addWatch with closed cnxn");
+            return false;
+        }
+
+        Set<Watcher> list = watchTable.get(path);
+        if (list == null) {
+            // don't waste memory if there are few watches on a node
+            // rehash when the 4th entry is added, doubling size thereafter
+            // seems like a good compromise
+            list = new HashSet<Watcher>(4);
+            watchTable.put(path, list);
+        }
+        list.add(watcher);
+
+        Set<String> paths = watch2Paths.get(watcher);
+        if (paths == null) {
+            // cnxns typically have many watches, so use default cap here
+            paths = new HashSet<String>();
+            watch2Paths.put(watcher, paths);
+        }
+        return paths.add(path);
+    }
+
+    @Override
+    public synchronized void removeWatcher(Watcher watcher) {
+        Set<String> paths = watch2Paths.remove(watcher);
+        if (paths == null) {
+            return;
+        }
+        for (String p : paths) {
+            Set<Watcher> list = watchTable.get(p);
+            if (list != null) {
+                list.remove(watcher);
+                if (list.isEmpty()) {
+                    watchTable.remove(p);
+                }
+            }
+        }
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(String path, EventType type) {
+        return triggerWatch(path, type, null);
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet supress) {
+        WatchedEvent e = new WatchedEvent(type,
+                KeeperState.SyncConnected, path);
+        Set<Watcher> watchers;
+        synchronized (this) {
+            watchers = watchTable.remove(path);
+            if (watchers == null || watchers.isEmpty()) {
+                if (LOG.isTraceEnabled()) {
+                    ZooTrace.logTraceMessage(LOG,
+                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
+                            "No watchers for " + path);
+                }
+                return null;
+            }
+            for (Watcher w : watchers) {
+                Set<String> paths = watch2Paths.get(w);
+                if (paths != null) {
+                    paths.remove(path);
+                }
+            }
+        }
+        for (Watcher w : watchers) {
+            if (supress != null && supress.contains(w)) {
+                continue;
+            }
+            w.process(e);
+        }
+        return new WatcherOrBitSet(watchers);
+    }
+
+    @Override
+    public synchronized String toString() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(watch2Paths.size()).append(" connections watching ")
+            .append(watchTable.size()).append(" paths\n");
+
+        int total = 0;
+        for (Set<String> paths : watch2Paths.values()) {
+            total += paths.size();
+        }
+        sb.append("Total watches:").append(total);
+
+        return sb.toString();
+    }
+
+    @Override
+    public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
+        if (byPath) {
+            for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
+                pwriter.println(e.getKey());
+                for (Watcher w : e.getValue()) {
+                    pwriter.print("\t0x");
+                    pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
+                    pwriter.print("\n");
+                }
+            }
+        } else {
+            for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
+                pwriter.print("0x");
+                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
+                for (String path : e.getValue()) {
+                    pwriter.print("\t");
+                    pwriter.println(path);
+                }
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean containsWatcher(String path, Watcher watcher) {
+        Set<String> paths = watch2Paths.get(watcher);
+        if (paths == null || !paths.contains(path)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public synchronized boolean removeWatcher(String path, Watcher watcher) {
+        Set<String> paths = watch2Paths.get(watcher);
+        if (paths == null || !paths.remove(path)) {
+            return false;
+        }
+
+        Set<Watcher> list = watchTable.get(path);
+        if (list == null || !list.remove(watcher)) {
+            return false;
+        }
+
+        if (list.isEmpty()) {
+            watchTable.remove(path);
+        }
+
+        return true;
+    }
+
+    @Override
+    public synchronized WatchesReport getWatches() {
+        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+        for (Entry<Watcher, Set<String>> e: watch2Paths.entrySet()) {
+            Long id = ((ServerCnxn) e.getKey()).getSessionId();
+            Set<String> paths = new HashSet<String>(e.getValue());
+            id2paths.put(id, paths);
+        }
+        return new WatchesReport(id2paths);
+    }
+
+    @Override
+    public synchronized WatchesPathReport getWatchesByPath() {
+        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+        for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
+            Set<Long> ids = new HashSet<Long>(e.getValue().size());
+            path2ids.put(e.getKey(), ids);
+            for (Watcher watcher : e.getValue()) {
+                ids.add(((ServerCnxn) watcher).getSessionId());
+            }
+        }
+        return new WatchesPathReport(path2ids);
+    }
+
+    @Override
+    public synchronized WatchesSummary getWatchesSummary() {
+        int totalWatches = 0;
+        for (Set<String> paths : watch2Paths.values()) {
+            totalWatches += paths.size();
+        }
+        return new WatchesSummary (watch2Paths.size(), watchTable.size(),
+                                   totalWatches);
+    }
+
+    @Override
+    public void shutdown() { /* do nothing */ }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
new file mode 100644
index 0000000..5f8834e
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A factory used to produce the actual watch manager based on the
+ * zookeeper.watchManagerName option.
+ */
+public class WatchManagerFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(WatchManagerFactory.class);
+
+    public static final String ZOOKEEPER_WATCH_MANAGER_NAME = "zookeeper.watchManagerName";
+
+    public static IWatchManager createWatchManager() throws IOException {
+        String watchManagerName = System.getProperty(ZOOKEEPER_WATCH_MANAGER_NAME);
+        if (watchManagerName == null) {
+            watchManagerName = WatchManager.class.getName();
+        }
+        try {
+            IWatchManager watchManager =
+                    (IWatchManager) Class.forName(watchManagerName).newInstance();
+            LOG.info("Using {} as watch manager", watchManagerName);
+            return watchManager;
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + watchManagerName);
+            ioe.initCause(e);
+            throw ioe;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
new file mode 100644
index 0000000..6abb760
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.util.BitHashSet;
+import org.apache.zookeeper.server.util.BitMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Optimized in memory and time complexity, compared to WatchManager, both the
+ * memory consumption and time complexity improved a lot, but it cannot
+ * efficiently remove the watcher when the session or socket is closed, for
+ * majority use case this is not a problem.
+ *
+ * Changed made compared to WatchManager:
+ *
+ * - Use HashSet and BitSet to store the watchers to find a balance between
+ *   memory usage and time complexity
+ * - Use ReadWriteLock instead of synchronized to reduce lock retention
+ * - Lazily clean up the closed watchers
+ */
+public class WatchManagerOptimized
+        implements IWatchManager, IDeadWatcherListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatchManagerOptimized.class);
+
+    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
+            new ConcurrentHashMap<String, BitHashSet>();
+
+    // watcher to bit id mapping
+    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
+
+    // used to lazily remove the dead watchers
+    private final WatcherCleaner watcherCleaner;
+
+    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
+
+    public WatchManagerOptimized() {
+        watcherCleaner = new WatcherCleaner(this);
+        watcherCleaner.start();
+    }
+
+    @Override
+    public boolean addWatch(String path, Watcher watcher) {
+        boolean result = false;
+        // Need readLock to exclusively lock with removeWatcher, otherwise we 
+        // may add a dead watch whose connection was just closed. 
+        //
+        // Creating new watcher bit and adding it to the BitHashSet has it's 
+        // own lock to minimize the write lock scope
+        addRemovePathRWLock.readLock().lock();
+        try {
+            // avoid race condition of adding a on flying dead watcher
+            if (isDeadWatcher(watcher)) {
+                LOG.debug("Ignoring addWatch with closed cnxn");
+            } else {
+                Integer bit = watcherBitIdMap.add(watcher);
+                BitHashSet watchers = pathWatches.get(path);
+                if (watchers == null) {
+                    watchers = new BitHashSet();
+                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
+                    // it's possible multiple thread might add to pathWatches 
+                    // while we're holding read lock, so we need this check 
+                    // here
+                    if (existingWatchers != null) {
+                        watchers = existingWatchers;
+                    }
+                }
+                result = watchers.add(bit);
+            }
+        } finally {
+            addRemovePathRWLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    /**
+     * Used in the OpCode.checkWatches, which is a read operation, since read
+     * and write requests are exclusively processed, we don't need to hold
+     * lock here. 
+     *
+     * Different from addWatch this method doesn't mutate any state, so we don't
+     * need to hold read lock to avoid dead watcher (cnxn closed) being added 
+     * to the watcher manager. 
+     *
+     * It's possible that before we lazily clean up the dead watcher, this will 
+     * return true, but since the cnxn is closed, the response will dropped as
+     * well, so it doesn't matter.
+     */
+    @Override
+    public boolean containsWatcher(String path, Watcher watcher) {
+        BitHashSet watchers = pathWatches.get(path);
+        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean removeWatcher(String path, Watcher watcher) {
+        // Hold write lock directly because removeWatcher request is more 
+        // likely to be invoked when the watcher is actually exist and 
+        // haven't fired yet, so instead of having read lock to check existence 
+        // before switching to write one, it's actually cheaper to hold write 
+        // lock directly here.
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            BitHashSet list = pathWatches.get(path);
+            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
+                return false;
+            }
+            if (list.isEmpty()) {
+                pathWatches.remove(path);
+            }
+            return true;
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void removeWatcher(Watcher watcher) {
+        Integer watcherBit;
+        // Use exclusive lock with addWatcher to guarantee that we won't add
+        // watch for a cnxn which is already closed. 
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            // do nothing if the watcher is not tracked
+            watcherBit = watcherBitIdMap.getBit(watcher);
+            if (watcherBit == null) {
+                return;
+            }
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+
+        // We can guarantee that when this line is executed, the cnxn of this 
+        // watcher has already been marked as stale (this method is only called 
+        // from ServerCnxn.close after we set stale), which means no watches 
+        // will be added to the watcher manager with this watcher, so that we
+        // can safely clean up this dead watcher. 
+        //
+        // So it's not necessary to have this line in the addRemovePathRWLock. 
+        // And moving the addDeadWatcher out of the locking block to avoid
+        // holding the write lock while we're blocked on adding dead watchers 
+        // into the watcherCleaner.
+        watcherCleaner.addDeadWatcher(watcherBit);
+    }
+
+    /**
+     * Entry for WatcherCleaner to remove dead watchers
+     *
+     * @param deadWatchers the watchers need to be removed
+     */
+    @Override
+    public void processDeadWatchers(Set<Integer> deadWatchers) {
+        // All the watchers being processed here are guaranteed to be dead, 
+        // no watches will be added for those dead watchers, that's why I 
+        // don't need to have addRemovePathRWLock here.
+        BitSet bits = new BitSet();
+        for (int dw: deadWatchers) {
+            bits.set(dw);
+        }
+        // The value iterator will reflect the state when it was
+        // created, don't need to synchronize.
+        for (BitHashSet watchers: pathWatches.values()) {
+            watchers.remove(deadWatchers, bits);
+        }
+        // Better to remove the empty path from pathWatches, but it will add
+        // lot of lock contention and affect the throughput of addWatch,
+        // let's rely on the triggerWatch to delete it.
+        for (Integer wbit: deadWatchers) {
+            watcherBitIdMap.remove(wbit);
+        }
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(String path, EventType type) {
+        return triggerWatch(path, type, null);
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet suppress) {
+        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+
+        BitHashSet watchers = remove(path);
+        if (watchers == null) {
+            return null;
+        }
+
+        // Avoid race condition between dead watcher cleaner in
+        // WatcherCleaner and iterating here
+        synchronized (watchers) {
+            for (Integer wBit : watchers) {
+                if (suppress != null && suppress.contains(wBit)) {
+                    continue;
+                }
+
+                Watcher w = watcherBitIdMap.get(wBit);
+
+                // skip dead watcher
+                if (w == null || isDeadWatcher(w)) {
+                    continue;
+                }
+
+                w.process(e);
+            }
+        }
+
+        return new WatcherOrBitSet(watchers);
+    }
+
+    @Override
+    public int size() {
+        int size = 0;
+        for(BitHashSet watches : pathWatches.values()) {
+            size += watches.size();
+        }
+        return size;
+    }
+
+    @Override
+    public void shutdown() {
+        if (watcherCleaner != null) {
+            watcherCleaner.shutdown();
+        }
+    }
+
+    private BitHashSet remove(String path) {
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            return pathWatches.remove(path);
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+    }
+
+    boolean isDeadWatcher(Watcher watcher) {
+        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
+    }
+
+    int pathSize() {
+        return pathWatches.size();
+    }
+
+    @Override
+    public WatchesSummary getWatchesSummary() {
+        return new WatchesSummary(
+                watcherBitIdMap.size(), pathSize(), size());
+    }
+
+    @Override
+    public WatchesReport getWatches() {
+        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+        for (Entry<Watcher, Set<String>> e: getWatcher2PathesMap().entrySet()) {
+            Long id = ((ServerCnxn) e.getKey()).getSessionId();
+            Set<String> paths = new HashSet<String>(e.getValue());
+            id2paths.put(id, paths);
+        }
+        return new WatchesReport(id2paths);
+    }
+
+    /**
+     * Iterate through ConcurrentHashMap is 'safe', it will reflect the state
+     * of the map at the time iteration began, may miss update while iterating,
+     * given this is used in the commands to get a general idea of the watches
+     * state, we don't care about missing some update.
+     */
+    @Override
+    public WatchesPathReport getWatchesByPath() {
+        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+        for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+            BitHashSet watchers = e.getValue();
+            synchronized (watchers) {
+                Set<Long> ids = new HashSet<Long>(watchers.size());
+                path2ids.put(e.getKey(), ids);
+                for (Integer wbit : watchers) {
+                    Watcher watcher = watcherBitIdMap.get(wbit);
+                    if (watcher instanceof ServerCnxn) {
+                        ids.add(((ServerCnxn) watcher).getSessionId());
+                    }
+                }
+            }
+        }
+        return new WatchesPathReport(path2ids);
+    }
+
+    /**
+     * May cause OOM if there are lots of watches, might better to forbid
+     * it in this class.
+     */
+    public Map<Watcher, Set<String>> getWatcher2PathesMap() {
+        Map<Watcher, Set<String>> watcher2paths =
+                new HashMap<Watcher, Set<String>>();
+        for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+            String path = e.getKey();
+            BitHashSet watchers = e.getValue();
+            // avoid race condition with add/remove
+            synchronized (watchers) {
+                for (Integer wbit: watchers) {
+                    Watcher w = watcherBitIdMap.get(wbit);
+                    if (w == null) {
+                        continue;
+                    }
+                    if (!watcher2paths.containsKey(w)) {
+                        watcher2paths.put(w, new HashSet<String>());
+                    }
+                    watcher2paths.get(w).add(path);
+                }
+            }
+        }
+        return watcher2paths;
+    }
+
+    @Override
+    public void dumpWatches(PrintWriter pwriter, boolean byPath) {
+        if (byPath) {
+            for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+                pwriter.println(e.getKey());
+                BitHashSet watchers = e.getValue();
+                synchronized (watchers) {
+                    for (Integer wbit : watchers) {
+                        Watcher w = watcherBitIdMap.get(wbit);
+                        if (!(w instanceof ServerCnxn)) {
+                            continue;
+                        }
+                        pwriter.print("\t0x");
+                        pwriter.print(
+                                Long.toHexString(((ServerCnxn)w).getSessionId()));
+                        pwriter.print("\n");
+                    }
+                }
+            }
+        } else {
+            for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) {
+                pwriter.print("0x");
+                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
+                for (String path : e.getValue()) {
+                    pwriter.print("\t");
+                    pwriter.println(path);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(watcherBitIdMap.size())
+            .append(" connections watching ")
+            .append(pathSize()).append(" paths\n");
+        sb.append("Total watches:").append(size());
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java b/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
new file mode 100644
index 0000000..2bfb5aa
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.RateLogger;
+import org.apache.zookeeper.server.WorkerService;
+import org.apache.zookeeper.server.WorkerService.WorkRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread used to lazily clean up the closed watcher, it will trigger the
+ * clean up when the dead watchers get certain number or some number of
+ * seconds has elapsed since last clean up.
+ *
+ * Cost of running it:
+ *
+ * - need to go through all the paths even if the watcher may only
+ *   watching a single path
+ * - block in the path BitHashSet when we try to check the dead watcher
+ *   which won't block other stuff
+ */
+public class WatcherCleaner extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
+    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
+
+    private volatile boolean stopped = false;
+    private final Object cleanEvent = new Object();
+    private final Random r = new Random(System.nanoTime());
+    private final WorkerService cleaners;
+
+    private final Set<Integer> deadWatchers;
+    private final IDeadWatcherListener listener;
+    private final int watcherCleanThreshold;
+    private final int watcherCleanIntervalInSeconds;
+    private final int maxInProcessingDeadWatchers;
+    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
+
+    public WatcherCleaner(IDeadWatcherListener listener) {
+        this(listener,
+            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
+            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
+            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
+            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
+    }
+
+    public WatcherCleaner(IDeadWatcherListener listener,
+            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
+            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
+        this.listener = listener;
+        this.watcherCleanThreshold = watcherCleanThreshold;
+        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
+        int suggestedMaxInProcessingThreshold =
+                watcherCleanThreshold * watcherCleanThreadsNum;
+        if (maxInProcessingDeadWatchers > 0 &&
+                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
+            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
+            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
+                    "than the suggested one, change it to use {}",
+                    maxInProcessingDeadWatchers);
+        }
+        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
+        this.deadWatchers = new HashSet<Integer>();
+        this.cleaners = new WorkerService("DeadWatcherCleanner",
+                watcherCleanThreadsNum, false);
+
+        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
+                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
+                watcherCleanThreshold, watcherCleanIntervalInSeconds,
+                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
+    }
+
+    public void addDeadWatcher(int watcherBit) {
+        // Wait if there are too many watchers waiting to be closed,
+        // this is will slow down the socket packet processing and
+        // the adding watches in the ZK pipeline.
+        while (maxInProcessingDeadWatchers > 0 && !stopped &&
+                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
+            try {
+                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
+                synchronized(totalDeadWatchers) {
+                    totalDeadWatchers.wait(100);
+                }
+            } catch (InterruptedException e) {
+                LOG.info("Got interrupted while waiting for dead watches " +
+                        "queue size");
+            }
+        }
+        synchronized (this) {
+            if (deadWatchers.add(watcherBit)) {
+                totalDeadWatchers.incrementAndGet();
+                if (deadWatchers.size() >= watcherCleanThreshold) {
+                    synchronized (cleanEvent) {
+                        cleanEvent.notifyAll();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            synchronized (cleanEvent) {
+                try {
+                    // add some jitter to avoid cleaning dead watchers at the
+                    // same time in the quorum
+                    if (deadWatchers.size() < watcherCleanThreshold) {
+                        int maxWaitMs = (watcherCleanIntervalInSeconds +
+                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
+                        cleanEvent.wait(maxWaitMs);
+                    }
+                } catch (InterruptedException e) {
+                    LOG.info("Received InterruptedException while " +
+                            "waiting for cleanEvent");
+                    break;
+                }
+            }
+
+            if (deadWatchers.isEmpty()) {
+                continue;
+            }
+
+            synchronized (this) {
+                // Clean the dead watchers need to go through all the current 
+                // watches, which is pretty heavy and may take a second if 
+                // there are millions of watches, that's why we're doing lazily 
+                // batch clean up in a separate thread with a snapshot of the 
+                // current dead watchers.
+                final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers);
+                deadWatchers.clear();
+                int total = snapshot.size();
+                LOG.info("Processing {} dead watchers", total);
+                cleaners.schedule(new WorkRequest() {
+                    @Override
+                    public void doWork() throws Exception {
+                        long startTime = Time.currentElapsedTime();
+                        listener.processDeadWatchers(snapshot);
+                        long latency = Time.currentElapsedTime() - startTime;
+                        LOG.info("Takes {} to process {} watches", latency, total);
+                        totalDeadWatchers.addAndGet(-total);
+                        synchronized(totalDeadWatchers) {
+                            totalDeadWatchers.notifyAll();
+                        }
+                    }
+                });
+            }
+        }
+        LOG.info("WatcherCleaner thread exited");
+    }
+
+    public void shutdown() {
+        stopped = true;
+        deadWatchers.clear();
+        cleaners.stop();
+    }
+
+}