You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/09/28 21:38:33 UTC

[1/2] zookeeper git commit: ZOOKEEPER-1177: Add the memory optimized watch manager for concentrate watches scenario

Repository: zookeeper
Updated Branches:
  refs/heads/master 4ebb847bc -> fdde8b006


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
new file mode 100644
index 0000000..83b186f
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+public class WatcherOrBitSet {
+
+    private Set<Watcher> watchers;
+    private BitHashSet watcherBits;
+
+    public WatcherOrBitSet(final Set<Watcher> watchers) {
+        this.watchers = watchers;
+    }
+
+    public WatcherOrBitSet(final BitHashSet watcherBits) {
+        this.watcherBits = watcherBits;
+    }
+
+    public boolean contains(Watcher watcher) {
+        if (watchers == null) {
+            return false;
+        }
+        return watchers.contains(watcher);
+    }
+
+    public boolean contains(int watcherBit) {
+        if (watcherBits == null) {
+            return false;
+        }
+        return watcherBits.contains(watcherBit);
+    }
+
+    public int size() {
+        if (watchers != null) {
+            return watchers.size();
+        }
+        if (watcherBits != null) {
+            return watcherBits.size();
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
new file mode 100644
index 0000000..38f02de
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A watch report, essentially a mapping of path to session IDs of sessions that
+ * have set a watch on that path. This class is immutable.
+ */
+public class WatchesPathReport {
+
+    private final Map<String, Set<Long>> path2Ids;
+
+    /**
+     * Creates a new report.
+     *
+     * @param path2Ids map of paths to session IDs of sessions that have set a
+     * watch on that path
+     */
+    WatchesPathReport(Map<String, Set<Long>> path2Ids) {
+        this.path2Ids = Collections.unmodifiableMap(deepCopy(path2Ids));
+    }
+
+    private static Map<String, Set<Long>> deepCopy(Map<String, Set<Long>> m) {
+        Map<String, Set<Long>> m2 = new HashMap<String, Set<Long>>();
+        for (Map.Entry<String, Set<Long>> e : m.entrySet()) {
+            m2.put(e.getKey(), new HashSet<Long>(e.getValue()));
+        }
+        return m2;
+    }
+
+    /**
+     * Checks if the given path has watches set.
+     *
+     * @param path path
+     * @return true if path has watch set
+     */
+    public boolean hasSessions(String path) {
+        return path2Ids.containsKey(path);
+    }
+    /**
+     * Gets the session IDs of sessions that have set watches on the given path.
+     * The returned set is immutable.
+     *
+     * @param path session ID
+     * @return session IDs of sessions that have set watches on the path, or
+     * null if none
+     */
+    public Set<Long> getSessions(String path) {
+        Set<Long> s = path2Ids.get(path);
+        return s != null ? Collections.unmodifiableSet(s) : null;
+    }
+
+    /**
+     * Converts this report to a map. The returned map is mutable, and changes
+     * to it do not reflect back into this report.
+     *
+     * @return map representation of report
+     */
+    public Map<String, Set<Long>> toMap() {
+        return deepCopy(path2Ids);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
new file mode 100644
index 0000000..ac888d3
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A watch report, essentially a mapping of session ID to paths that the session
+ * has set a watch on. This class is immutable.
+ */
+public class WatchesReport {
+
+    private final Map<Long, Set<String>> id2paths;
+
+    /**
+     * Creates a new report.
+     *
+     * @param id2paths map of session IDs to paths that each session has set
+     * a watch on
+     */
+    WatchesReport(Map<Long, Set<String>> id2paths) {
+        this.id2paths = Collections.unmodifiableMap(deepCopy(id2paths));
+    }
+
+    private static Map<Long, Set<String>> deepCopy(Map<Long, Set<String>> m) {
+        Map<Long, Set<String>> m2 = new HashMap<Long, Set<String>>();
+        for (Map.Entry<Long, Set<String>> e : m.entrySet()) {
+            m2.put(e.getKey(), new HashSet<String>(e.getValue()));
+        }
+        return m2;
+    }
+
+    /**
+     * Checks if the given session has watches set.
+     *
+     * @param sessionId session ID
+     * @return true if session has paths with watches set
+     */
+    public boolean hasPaths(long sessionId) {
+        return id2paths.containsKey(sessionId);
+    }
+
+    /**
+     * Gets the paths that the given session has set watches on. The returned
+     * set is immutable.
+     *
+     * @param sessionId session ID
+     * @return paths that have watches set by the session, or null if none
+     */
+    public Set<String> getPaths(long sessionId) {
+        Set<String> s = id2paths.get(sessionId);
+        return s != null ? Collections.unmodifiableSet(s) : null;
+    }
+
+    /**
+     * Converts this report to a map. The returned map is mutable, and changes
+     * to it do not reflect back into this report.
+     *
+     * @return map representation of report
+     */
+    public Map<Long, Set<String>> toMap() {
+        return deepCopy(id2paths);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
new file mode 100644
index 0000000..b2449ba
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A summary of watch information. This class is immutable.
+ */
+public class WatchesSummary {
+
+    /**
+     * The key in the map returned by {@link #toMap()} for the number of
+     * connections.
+     */
+    public static final String KEY_NUM_CONNECTIONS = "num_connections";
+    /**
+     * The key in the map returned by {@link #toMap()} for the number of paths.
+     */
+    public static final String KEY_NUM_PATHS = "num_paths";
+    /**
+     * The key in the map returned by {@link #toMap()} for the total number of
+     * watches.
+     */
+    public static final String KEY_NUM_TOTAL_WATCHES = "num_total_watches";
+
+    private final int numConnections;
+    private final int numPaths;
+    private final int totalWatches;
+
+    /**
+     * Creates a new summary.
+     *
+     * @param numConnections the number of sessions that have set watches
+     * @param numPaths the number of paths that have watches set on them
+     * @param totalWatches the total number of watches set
+     */
+    WatchesSummary(int numConnections, int numPaths, int totalWatches) {
+        this.numConnections = numConnections;
+        this.numPaths = numPaths;
+        this.totalWatches = totalWatches;
+    }
+
+    /**
+     * Gets the number of connections (sessions) that have set watches.
+     *
+     * @return number of connections
+     */
+    public int getNumConnections() {
+        return numConnections;
+    }
+    /**
+     * Gets the number of paths that have watches set on them.
+     *
+     * @return number of paths
+     */
+    public int getNumPaths() {
+        return numPaths;
+    }
+    /**
+     * Gets the total number of watches set.
+     *
+     * @return total watches
+     */
+    public int getTotalWatches() {
+        return totalWatches;
+    }
+
+    /**
+     * Converts this summary to a map. The returned map is mutable, and changes
+     * to it do not reflect back into this summary.
+     *
+     * @return map representation of summary
+     */
+    public Map<String, Object> toMap() {
+        Map<String, Object> summary = new LinkedHashMap<String, Object>();
+        summary.put(KEY_NUM_CONNECTIONS, numConnections);
+        summary.put(KEY_NUM_PATHS, numPaths);
+        summary.put(KEY_NUM_TOTAL_WATCHES, totalWatches);
+        return summary;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/config/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml
index 4ab5a5e..a3f45a6 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -53,7 +53,13 @@
     <Method name="run" />
     <Bug pattern="DM_EXIT" />
   </Match>
-  
+
+   <!-- Failed to create watch manager is a unrecoverable error -->
+   <Match>
+     <Class name="org.apache.zookeeper.server.DataTree" />
+     <Bug pattern="DM_EXIT" />
+   </Match>
+
   <Match>
     <Package name="org.apache.jute.compiler.generated" />
   </Match>
@@ -85,7 +91,7 @@
 
   <Match>
     <Class name="org.apache.zookeeper.server.DataNode"/>
-      <Field name="children"/> 
+      <Field name="children"/>
       <Bug code="IS"/>
   </Match>
  <Match>
@@ -98,6 +104,15 @@
     <Field name="serverStats"/>
     <Bug code="IS"/>
   </Match>
+
+  <!-- The iterate function is non-thread safe, the caller will synchronize
+       on the BitHHashSet object -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.util.BitHashSet" />
+    <Field name="elementCount" />
+    <Bug code="IS" />
+  </Match>
+
   <Match>
      <Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
        <Bug code="UrF"/>
@@ -111,7 +126,7 @@
   <!-- these are old classes just for upgrading and should go away -->
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataNodeV1"/>
-  </Match> 
+  </Match>
 
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataTreeV1"/>
@@ -134,6 +149,23 @@
     </Or>
   </Match>
 
+  <!-- Synchronize on the AtomicInteger to do wait/notify, but not relying
+       on the synchronization to control the AtomicInteger value update,
+       so it's not a problem -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.watch.WatcherCleaner" />
+    <Bug code="JLM" />
+    <Method name="addDeadWatcher" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.zookeeper.server.watch.WatcherCleaner$1" />
+    <Bug code="JLM" />
+    <Method name="doWork" />
+  </Match>
+
+
+
   <Match>
     <Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
     <Bug pattern="OS_OPEN_STREAM" />

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java b/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
deleted file mode 100644
index c0b107d..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesPathReportTest extends ZKTestCase {
-    private Map<String, Set<Long>> m;
-    private WatchesPathReport r;
-    @Before public void setUp() {
-        m = new HashMap<String, Set<Long>>();
-        Set<Long> s = new HashSet<Long>();
-        s.add(101L);
-        s.add(102L);
-        m.put("path1", s);
-        s = new HashSet<Long>();
-        s.add(201L);
-        m.put("path2", s);
-        r = new WatchesPathReport(m);
-    }
-    @Test public void testHasSessions() {
-        assertTrue(r.hasSessions("path1"));
-        assertTrue(r.hasSessions("path2"));
-        assertFalse(r.hasSessions("path3"));
-    }
-    @Test public void testGetSessions() {
-        Set<Long> s = r.getSessions("path1");
-        assertEquals(2, s.size());
-        assertTrue(s.contains(101L));
-        assertTrue(s.contains(102L));
-        s = r.getSessions("path2");
-        assertEquals(1, s.size());
-        assertTrue(s.contains(201L));
-        assertNull(r.getSessions("path3"));
-    }
-    @Test public void testToMap() {
-        assertEquals(m, r.toMap());
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java b/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
deleted file mode 100644
index 7f0343b..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesReportTest extends ZKTestCase {
-    private Map<Long, Set<String>> m;
-    private WatchesReport r;
-    @Before public void setUp() {
-        m = new HashMap<Long, Set<String>>();
-        Set<String> s = new HashSet<String>();
-        s.add("path1a");
-        s.add("path1b");
-        m.put(1L, s);
-        s = new HashSet<String>();
-        s.add("path2a");
-        m.put(2L, s);
-        r = new WatchesReport(m);
-    }
-    @Test public void testHasPaths() {
-        assertTrue(r.hasPaths(1L));
-        assertTrue(r.hasPaths(2L));
-        assertFalse(r.hasPaths(3L));
-    }
-    @Test public void testGetPaths() {
-        Set<String> s = r.getPaths(1L);
-        assertEquals(2, s.size());
-        assertTrue(s.contains("path1a"));
-        assertTrue(s.contains("path1b"));
-        s = r.getPaths(2L);
-        assertEquals(1, s.size());
-        assertTrue(s.contains("path2a"));
-        assertNull(r.getPaths(3L));
-    }
-    @Test public void testToMap() {
-        assertEquals(m, r.toMap());
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java b/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
deleted file mode 100644
index d679065..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.Map;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesSummaryTest extends ZKTestCase {
-    private WatchesSummary s;
-    @Before public void setUp() {
-        s = new WatchesSummary(1, 2, 3);
-    }
-    @Test public void testGetters() {
-        assertEquals(1, s.getNumConnections());
-        assertEquals(2, s.getNumPaths());
-        assertEquals(3, s.getTotalWatches());
-    }
-    @Test public void testToMap() {
-        Map<String, Object> m = s.toMap();
-        assertEquals(3, m.size());
-        assertEquals(Integer.valueOf(1), m.get(WatchesSummary.KEY_NUM_CONNECTIONS));
-        assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS));
-        assertEquals(Integer.valueOf(3), m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES));
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
new file mode 100644
index 0000000..a70eaa5
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitHashSetTest extends ZKTestCase {
+
+    @Test
+    public void testAddWatchBit() {
+        int watcherCacheSize = 1;
+        BitHashSet ws = new BitHashSet(watcherCacheSize);
+        Assert.assertTrue(ws.add(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        List<Integer> actualBits = new ArrayList<Integer>();
+
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {1},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        // add the same bit again
+        Assert.assertFalse(ws.add(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        // add another bit, make sure there there is only 1 bit cached
+        Assert.assertTrue(ws.add(2));
+        Assert.assertEquals(2, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        Assert.assertTrue(ws.contains(1));
+
+        actualBits.clear();
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {1, 2},
+            actualBits.toArray(new Integer[actualBits.size()]));
+    }
+
+    @Test
+    public void testRemoveWatchBit() {
+        int watcherCacheSize = 1;
+        BitHashSet ws = new BitHashSet(watcherCacheSize);
+        ws.add(1);
+        ws.add(2);
+
+        Assert.assertTrue(ws.contains(1));
+        Assert.assertTrue(ws.contains(2));
+
+        ws.remove(1);
+        Assert.assertFalse(ws.contains(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(0, ws.cachedSize());
+
+        List<Integer> actualBits = new ArrayList<Integer>();
+
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {2},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        ws.add(3);
+        Assert.assertEquals(2, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        actualBits.clear();
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {2, 3},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        ws.remove(2);
+        ws.remove(3);
+
+        Assert.assertEquals(0, ws.size());
+        Assert.assertEquals(0, ws.cachedSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
new file mode 100644
index 0000000..eca0f2d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitMapTest extends ZKTestCase {
+
+    @Test
+    public void testAddAndRemove() {
+        BitMap<String> bitMap = new BitMap<String>();
+        String v1 = new String("v1");
+        Integer bit = bitMap.add(v1);
+
+        Assert.assertEquals(1, bitMap.size());
+        Assert.assertTrue(bit >= 0);
+        Assert.assertEquals(v1, bitMap.get(bit));
+        Assert.assertEquals(bit, bitMap.getBit(v1));
+
+        // add the same value again
+        Integer newBit = bitMap.add(v1);
+        Assert.assertEquals(bit, newBit);
+        Assert.assertEquals(1, bitMap.size());
+
+        String v2 = new String("v2");
+        Integer v2Bit = bitMap.add(v2);
+        Assert.assertEquals(2, bitMap.size());
+        Assert.assertNotEquals(v2Bit, bit);
+
+        // remove by value
+        bitMap.remove(v1);
+        Assert.assertEquals(1, bitMap.size());
+        Assert.assertNull(bitMap.get(bit));
+        Assert.assertNull(bitMap.getBit(v1));
+
+        // remove by bit
+        bitMap.remove(v2Bit);
+        Assert.assertEquals(0, bitMap.size());
+        Assert.assertNull(bitMap.get(v2Bit));
+        Assert.assertNull(bitMap.getBit(v2));
+    }
+
+    @Test
+    public void testBitReuse() {
+        BitMap<String> bitMap = new BitMap<String>();
+        int v1Bit = bitMap.add("v1");
+        int v2Bit = bitMap.add("v2");
+        int v3Bit = bitMap.add("v3");
+        bitMap.remove(v2Bit);
+
+        int v4Bit = bitMap.add("v4");
+
+        Assert.assertEquals(v4Bit, v2Bit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
new file mode 100644
index 0000000..f6a229b
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class WatchManagerTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);
+
+    private static final String PATH_PREFIX = "path";
+
+    private ConcurrentHashMap<Integer, DumbWatcher> watchers;
+    private Random r;
+    private String className;
+
+    public WatchManagerTest(String className) {
+        this.className = className;
+    }
+
+    @Parameterized.Parameters
+    public static List<Object []> data() {
+        return Arrays.asList(new Object [][] {
+            {WatchManager.class.getName()},
+            {WatchManagerOptimized.class.getName()}
+        });
+    }
+
+    @Before
+    public void setUp() {
+        watchers = new ConcurrentHashMap<Integer, DumbWatcher>();
+        r = new Random(System.nanoTime());
+    }
+
+    public IWatchManager getWatchManager() throws IOException {
+        System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, className);
+        return WatchManagerFactory.createWatchManager();
+    }
+
+    public DumbWatcher createOrGetWatcher(int watcherId) {
+        if (!watchers.containsKey(watcherId)) {
+            DumbWatcher watcher = new DumbWatcher(watcherId);
+            watchers.putIfAbsent(watcherId, watcher);
+        }
+        return watchers.get(watcherId);
+    }
+
+    public class AddWatcherWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final int watchers;
+        private final AtomicInteger watchesAdded;
+        private volatile boolean stopped = false;
+
+        public AddWatcherWorker(IWatchManager manager,
+                int paths, int watchers, AtomicInteger watchesAdded) {
+            this.manager = manager;
+            this.paths = paths;
+            this.watchers = watchers;
+            this.watchesAdded = watchesAdded;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                if (manager.addWatch(path, watcher)) {
+                    watchesAdded.addAndGet(1);
+                }
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+    }
+
+    public class WatcherTriggerWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final AtomicInteger triggeredCount;
+        private volatile boolean stopped = false;
+
+        public WatcherTriggerWorker(IWatchManager manager,
+                int paths, AtomicInteger triggeredCount) {
+            this.manager = manager;
+            this.paths = paths;
+            this.triggeredCount = triggeredCount;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                WatcherOrBitSet s = manager.triggerWatch(
+                        path, EventType.NodeDeleted);
+                if (s != null) {
+                    triggeredCount.addAndGet(s.size());
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+    }
+
+    public class RemoveWatcherWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final int watchers;
+        private final AtomicInteger watchesRemoved;
+        private volatile boolean stopped = false;
+
+        public RemoveWatcherWorker(IWatchManager manager,
+                int paths, int watchers, AtomicInteger watchesRemoved) {
+            this.manager = manager;
+            this.paths = paths;
+            this.watchers = watchers;
+            this.watchesRemoved = watchesRemoved;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                if (manager.removeWatcher(path, watcher)) {
+                    watchesRemoved.addAndGet(1);
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+
+    }
+
+    public class CreateDeadWatchersWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int watchers;
+        private final Set<Watcher> removedWatchers;
+        private volatile boolean stopped = false;
+
+        public CreateDeadWatchersWorker(IWatchManager manager,
+                int watchers, Set<Watcher> removedWatchers) {
+            this.manager = manager;
+            this.watchers = watchers;
+            this.removedWatchers = removedWatchers;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                DumbWatcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                watcher.setStale();
+                manager.removeWatcher(watcher);
+                synchronized (removedWatchers) {
+                    removedWatchers.add(watcher);
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+
+    }
+
+    /**
+     * Concurrently add and trigger watch, make sure the watches triggered
+     * are the same as the number added.
+     */
+    @Test(timeout = 90000)
+    public void testAddAndTriggerWatcher() throws IOException {
+        IWatchManager manager = getWatchManager();
+        int paths = 1;
+        int watchers = 10000;
+
+        // 1. start 5 workers to trigger watchers on that path
+        //    count all the watchers have been fired
+        AtomicInteger watchTriggered = new AtomicInteger();
+        List<WatcherTriggerWorker> triggerWorkers =
+                new ArrayList<WatcherTriggerWorker>();
+        for (int i = 0; i < 5; i++) {
+            WatcherTriggerWorker worker =
+                    new WatcherTriggerWorker(manager, paths, watchTriggered);
+            triggerWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on the same path
+        //    count all the watchers being added
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 100000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all the addWorkers
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. running the trigger worker a bit longer to make sure
+        //    all watchers added are fired
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {}
+
+        // 5. stop all triggerWorkers
+        for (WatcherTriggerWorker worker: triggerWorkers) {
+            worker.shutdown();
+        }
+
+        // 6. make sure the total watch triggered is same as added
+        Assert.assertTrue(watchesAdded.get() > 0);
+        Assert.assertEquals(watchesAdded.get(), watchTriggered.get());
+    }
+
+    /**
+     * Concurrently add and remove watch, make sure the watches left +
+     * the watches removed are equal to the total added watches.
+     */
+    @Test(timeout = 90000)
+    public void testRemoveWatcherOnPath() throws IOException {
+        IWatchManager manager = getWatchManager();
+        int paths = 10;
+        int watchers = 10000;
+
+        // 1. start 5 workers to remove watchers on those path
+        //    record the watchers have been removed
+        AtomicInteger watchesRemoved = new AtomicInteger();
+        List<RemoveWatcherWorker> removeWorkers =
+                new ArrayList<RemoveWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            RemoveWatcherWorker worker =
+                    new RemoveWatcherWorker(manager, paths, watchers, watchesRemoved);
+            removeWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on different path
+        //    record the watchers have been added
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 100000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all workers
+        for (RemoveWatcherWorker worker: removeWorkers) {
+            worker.shutdown();
+        }
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. sleep for a while to make sure all the thread exited
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {}
+
+        // 5. make sure left watches + removed watches = added watches
+        Assert.assertTrue(watchesAdded.get() > 0);
+        Assert.assertTrue(watchesRemoved.get() > 0);
+        Assert.assertTrue(manager.size() > 0);
+        Assert.assertEquals(
+                watchesAdded.get(), watchesRemoved.get() + manager.size());
+    }
+
+    /**
+     * Concurrently add watch while close the watcher to simulate the
+     * client connections closed on prod.
+     */
+    @Test(timeout = 90000)
+    public void testDeadWatchers() throws IOException {
+        System.setProperty("zookeeper.watcherCleanThreshold", "10");
+        System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1");
+
+        IWatchManager manager = getWatchManager();
+        int paths = 1;
+        int watchers = 100000;
+
+        // 1. start 5 workers to randomly mark those watcher as dead
+        //    and remove them from watch manager
+        Set<Watcher> deadWatchers = new HashSet<Watcher>();
+        List<CreateDeadWatchersWorker> deadWorkers =
+                new ArrayList<CreateDeadWatchersWorker>();
+        for (int i = 0; i < 5; i++) {
+            CreateDeadWatchersWorker worker = new CreateDeadWatchersWorker(
+                    manager, watchers, deadWatchers);
+            deadWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on the same path
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 50000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all workers
+        for (CreateDeadWatchersWorker worker: deadWorkers) {
+            worker.shutdown();
+        }
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. sleep for a while to make sure all the thread exited
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {}
+
+        // 5. make sure the dead watchers are not in the existing watchers
+        WatchesReport existingWatchers = manager.getWatches();
+        for (Watcher w: deadWatchers) {
+            Assert.assertFalse(
+                    existingWatchers.hasPaths(((ServerCnxn) w).getSessionId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
new file mode 100644
index 0000000..d315232
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.common.Time;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherCleanerTest extends ZKTestCase {
+
+    public static class MyDeadWatcherListener implements IDeadWatcherListener {
+
+        private CountDownLatch latch;
+        private int delayMs;
+        private Set<Integer> deadWatchers = new HashSet<Integer>();
+
+        public void setCountDownLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        public void setDelayMs(int delayMs) {
+            this.delayMs = delayMs;
+        }
+
+        @Override
+        public void processDeadWatchers(Set<Integer> deadWatchers) {
+            if (delayMs > 0) {
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException e) {}
+            }
+            this.deadWatchers.clear();
+            this.deadWatchers.addAll(deadWatchers);
+            latch.countDown();
+        }
+
+        public Set<Integer> getDeadWatchers() {
+            return deadWatchers;
+        }
+
+        public boolean wait(int maxWaitMs) {
+            try {
+                return latch.await(maxWaitMs, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {}
+            return false;
+        }
+    }
+
+    @Test
+    public void testProcessDeadWatchersBasedOnThreshold() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        int threshold = 3;
+        WatcherCleaner cleaner = new WatcherCleaner(listener, threshold, 60, 1, 10);
+        cleaner.start();
+
+        int i = 0;
+        while (i++ < threshold - 1) {
+            cleaner.addDeadWatcher(i);
+        }
+        // not trigger processDeadWatchers yet
+        Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+        listener.setCountDownLatch(new CountDownLatch(1));
+        // add another dead watcher to trigger the process
+        cleaner.addDeadWatcher(i);
+        Assert.assertTrue(listener.wait(1000));
+        Assert.assertEquals(threshold, listener.getDeadWatchers().size());
+    }
+
+    @Test
+    public void testProcessDeadWatchersBasedOnTime() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 10, 1, 1, 10);
+        cleaner.start();
+
+        cleaner.addDeadWatcher(1);
+        // not trigger processDeadWatchers yet
+        Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+        listener.setCountDownLatch(new CountDownLatch(1));
+        Assert.assertTrue(listener.wait(2000));
+        Assert.assertEquals(1, listener.getDeadWatchers().size());
+
+        // won't trigger event if there is no dead watchers
+        listener.setCountDownLatch(new CountDownLatch(1));
+        Assert.assertFalse(listener.wait(2000));
+    }
+
+    @Test
+    public void testMaxInProcessingDeadWatchers() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        int delayMs = 1000;
+        listener.setDelayMs(delayMs);
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 60, 1, 1);
+        cleaner.start();
+
+        listener.setCountDownLatch(new CountDownLatch(2));
+
+        long startTime = Time.currentElapsedTime();
+        cleaner.addDeadWatcher(1);
+        cleaner.addDeadWatcher(2);
+        long time = Time.currentElapsedTime() - startTime;
+        System.out.println("time used " + time);
+        Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
+        Assert.assertTrue(listener.wait(5000));
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
new file mode 100644
index 0000000..4b7fbd5
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherOrBitSetTest extends ZKTestCase {
+
+    @Test
+    public void testWatcherSet() {
+        Set<Watcher> wset = new HashSet<Watcher>();
+        WatcherOrBitSet hashSet = new WatcherOrBitSet(wset);
+        Assert.assertEquals(0, hashSet.size());
+
+        DumbWatcher w1 = new DumbWatcher();
+        Assert.assertFalse(hashSet.contains(w1));
+        wset.add(w1);
+        Assert.assertTrue(hashSet.contains(w1));
+        Assert.assertEquals(1, hashSet.size());
+        Assert.assertFalse(hashSet.contains(1));
+    }
+
+    @Test
+    public void testBitSet() {
+        BitHashSet bset = new BitHashSet(0);
+        WatcherOrBitSet bitSet = new WatcherOrBitSet(bset);
+        Assert.assertEquals(0, bitSet.size());
+
+        Integer bit = new Integer(1);
+        Assert.assertFalse(bitSet.contains(1));
+        Assert.assertFalse(bitSet.contains(bit));
+
+        bset.add(bit);
+        Assert.assertTrue(bitSet.contains(1));
+        Assert.assertTrue(bitSet.contains(bit));
+        Assert.assertEquals(1, bitSet.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
new file mode 100644
index 0000000..34e3789
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesPathReportTest extends ZKTestCase {
+    private Map<String, Set<Long>> m;
+    private WatchesPathReport r;
+    @Before public void setUp() {
+        m = new HashMap<String, Set<Long>>();
+        Set<Long> s = new HashSet<Long>();
+        s.add(101L);
+        s.add(102L);
+        m.put("path1", s);
+        s = new HashSet<Long>();
+        s.add(201L);
+        m.put("path2", s);
+        r = new WatchesPathReport(m);
+    }
+    @Test public void testHasSessions() {
+        assertTrue(r.hasSessions("path1"));
+        assertTrue(r.hasSessions("path2"));
+        assertFalse(r.hasSessions("path3"));
+    }
+    @Test public void testGetSessions() {
+        Set<Long> s = r.getSessions("path1");
+        assertEquals(2, s.size());
+        assertTrue(s.contains(101L));
+        assertTrue(s.contains(102L));
+        s = r.getSessions("path2");
+        assertEquals(1, s.size());
+        assertTrue(s.contains(201L));
+        assertNull(r.getSessions("path3"));
+    }
+    @Test public void testToMap() {
+        assertEquals(m, r.toMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
new file mode 100644
index 0000000..237583a
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesReportTest extends ZKTestCase {
+    private Map<Long, Set<String>> m;
+    private WatchesReport r;
+    @Before public void setUp() {
+        m = new HashMap<Long, Set<String>>();
+        Set<String> s = new HashSet<String>();
+        s.add("path1a");
+        s.add("path1b");
+        m.put(1L, s);
+        s = new HashSet<String>();
+        s.add("path2a");
+        m.put(2L, s);
+        r = new WatchesReport(m);
+    }
+    @Test public void testHasPaths() {
+        assertTrue(r.hasPaths(1L));
+        assertTrue(r.hasPaths(2L));
+        assertFalse(r.hasPaths(3L));
+    }
+    @Test public void testGetPaths() {
+        Set<String> s = r.getPaths(1L);
+        assertEquals(2, s.size());
+        assertTrue(s.contains("path1a"));
+        assertTrue(s.contains("path1b"));
+        s = r.getPaths(2L);
+        assertEquals(1, s.size());
+        assertTrue(s.contains("path2a"));
+        assertNull(r.getPaths(3L));
+    }
+    @Test public void testToMap() {
+        assertEquals(m, r.toMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
new file mode 100644
index 0000000..35956f1
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Map;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesSummaryTest extends ZKTestCase {
+    private WatchesSummary s;
+    @Before public void setUp() {
+        s = new WatchesSummary(1, 2, 3);
+    }
+    @Test public void testGetters() {
+        assertEquals(1, s.getNumConnections());
+        assertEquals(2, s.getNumPaths());
+        assertEquals(3, s.getTotalWatches());
+    }
+    @Test public void testToMap() {
+        Map<String, Object> m = s.toMap();
+        assertEquals(3, m.size());
+        assertEquals(Integer.valueOf(1), m.get(WatchesSummary.KEY_NUM_CONNECTIONS));
+        assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS));
+        assertEquals(Integer.valueOf(3), m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES));
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/BenchMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/bench/org/apache/zookeeper/BenchMain.java b/src/test/java/bench/org/apache/zookeeper/BenchMain.java
new file mode 100644
index 0000000..8e370c0
--- /dev/null
+++ b/src/test/java/bench/org/apache/zookeeper/BenchMain.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class BenchMain {
+    public static void main(String args[]) throws Exception {
+        org.openjdk.jmh.Main.main(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
----------------------------------------------------------------------
diff --git a/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
new file mode 100644
index 0000000..0510df7
--- /dev/null
+++ b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.DumbWatcher;
+
+import org.openjdk.jmh.annotations.*;
+
+import java.util.concurrent.TimeUnit;
+
+@Fork(3)
+public class WatchBench {
+
+    static final String pathPrefix = "/reasonably/long/path/";
+    static final EventType event = EventType.NodeDataChanged;
+
+    static IWatchManager createWatchManager(String className) throws Exception {
+        Class clazz = Class.forName(
+                "org.apache.zookeeper.server.watch." + className);
+        return (IWatchManager) clazz.newInstance();
+    }
+
+    static void forceGC() {
+        int gcTimes = 3;
+        for (int i = 0; i < gcTimes; i++) {
+            try {
+                System.gc();
+                Thread.currentThread().sleep(1000);
+
+                System.runFinalization();
+                Thread.currentThread().sleep(1000);
+            } catch (InterruptedException ex) { /* ignore */ }
+        }
+    }
+
+    static long getMemoryUse() {
+        forceGC();
+        long totalMem = Runtime.getRuntime().totalMemory();
+
+        forceGC();
+        long freeMem = Runtime.getRuntime().freeMemory();
+        return totalMem - freeMem;
+    }
+
+    @State(Scope.Benchmark)
+    public static class IterationState {
+
+        @Param({"WatchManager", "WatchManagerOptimized"})
+        public String watchManagerClass;
+
+        @Param({"10000"})
+        public int pathCount;
+
+        String[] paths;
+
+        long watchesAdded = 0;
+        IWatchManager watchManager;
+
+        long memWhenSetup = 0;
+
+        @Setup(Level.Iteration)
+        public void setup() throws Exception {
+            paths = new String[pathCount];
+            for (int i = 0; i < paths.length; i++) {
+                paths[i] = pathPrefix + i;
+            }
+
+            watchesAdded = 0;
+            watchManager = createWatchManager(watchManagerClass);
+
+            memWhenSetup = getMemoryUse();
+        }
+
+        @TearDown(Level.Iteration)
+        public void tearDown() {
+            long memUsed = getMemoryUse() - memWhenSetup;
+            System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+            double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+            System.out.println(
+                    "Memory used per million watches " +
+                    String.format("%.2f", memPerMillionWatchesMB) + "MB");
+        }
+    }
+
+    /**
+     * Test concenrate watch case where the watcher watches all paths.
+     *
+     * The output of this test will be the average time used to add the
+     * watch to all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testAddConcentrateWatch(IterationState state) throws Exception {
+        Watcher watcher = new DumbWatcher();
+
+        // watch all paths
+        for (String path : state.paths) {
+            if (state.watchManager.addWatch(path, watcher)) {
+                state.watchesAdded++;
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class InvocationState {
+
+        @Param({"WatchManager", "WatchManagerOptimized"})
+        public String watchManagerClass;
+
+        @Param({"1", "1000"})
+        public int pathCount;
+
+        @Param({"1", "1000"})
+        public int watcherCount;
+
+        String[] paths;
+        Watcher[] watchers;
+
+        IWatchManager watchManager;
+
+        @Setup(Level.Invocation)
+        public void setup() throws Exception {
+            initialize();
+            prepare();
+        }
+
+        void initialize() throws Exception {
+            if (paths == null || paths.length != pathCount) {
+                paths = new String[pathCount];
+                for (int i = 0; i < pathCount; i++) {
+                    paths[i] = pathPrefix + i;
+                }
+            }
+
+            if (watchers == null || watchers.length != watcherCount) {
+                watchers = new Watcher[watcherCount];
+                for (int i = 0; i < watcherCount; i++) {
+                    watchers[i] = new DumbWatcher();
+                }
+            }
+            if (watchManager == null ||
+                    !watchManager.getClass().getSimpleName().contains(
+                            watchManagerClass)) {
+                watchManager = createWatchManager(watchManagerClass);
+            }
+        }
+
+        void prepare() {
+            for (String path : paths) {
+                for (Watcher watcher : watchers) {
+                    watchManager.addWatch(path, watcher);
+                }
+            }
+        }
+    }
+
+    /**
+     * Test trigger watches in concenrate case.
+     *
+     * The output of this test is the time used to trigger those watches on
+     * all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
+        for (String path : state.paths) {
+            state.watchManager.triggerWatch(path, event);
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class AddSparseWatchState extends InvocationState {
+
+        @Param({"10000"})
+        public int pathCount;
+
+        @Param({"10000"})
+        public int watcherCount;
+
+        long watchesAdded = 0;
+        long memWhenSetup = 0;
+
+        @Override
+        public void prepare() {
+            watchesAdded = 0;
+            memWhenSetup = getMemoryUse();
+        }
+
+        @TearDown(Level.Invocation)
+        public void tearDown() {
+            long memUsed = getMemoryUse() - memWhenSetup;
+            System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+            double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+            System.out.println(
+                    "Memory used per million sparse watches " +
+                    String.format("%.2f", memPerMillionWatchesMB) + "MB");
+
+            // clear all the watches
+            for (String path : paths) {
+                watchManager.triggerWatch(path, event);
+            }
+        }
+    }
+
+    /**
+     * Test sparse watch case where only one watcher watches all paths, and
+     * only one path being watched by all watchers.
+     *
+     * The output of this test will be the average time used to add those
+     * sparse watches.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testAddSparseWatch(AddSparseWatchState state) throws Exception {
+        // All watchers are watching the 1st path
+        for (Watcher watcher : state.watchers) {
+            if (state.watchManager.addWatch(state.paths[0], watcher)) {
+                state.watchesAdded++;
+            }
+        }
+        // The 1st watcher is watching all paths
+        for (String path : state.paths) {
+            if (state.watchManager.addWatch(path, state.watchers[0])) {
+                state.watchesAdded++;
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class TriggerSparseWatchState extends InvocationState {
+
+        @Param({"10000"})
+        public int pathCount;
+
+        @Param({"10000"})
+        public int watcherCount;
+
+        @Override
+        public void prepare() {
+            // All watchers are watching the 1st path
+            for (Watcher watcher : watchers) {
+                watchManager.addWatch(paths[0], watcher);
+            }
+
+            // The 1st watcher is watching all paths
+            for (String path : paths) {
+                watchManager.addWatch(path, watchers[0]);
+            }
+        }
+    }
+
+
+    /**
+     * Test trigger watches in sparse case.
+     *
+     * The output of this test is the time used to trigger those watches on
+     * all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
+        for (String path : state.paths) {
+            state.watchManager.triggerWatch(path, event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
----------------------------------------------------------------------
diff --git a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
index 2b0fc83..ba29e89 100644
--- a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
+++ b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
@@ -8,3 +8,4 @@ quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the
 abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast
 ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager
 systest:org.apache.zookeeper.test.system.BaseSysTest:Start system test
+jmh:org.apache.zookeeper.BenchMain:Run jmh micro benchmarks

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
index 50143d4..b7cd21b 100644
--- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
+++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
@@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
             </listitem>
           </varlistentry>
 
+
+          <varlistentry>
+            <term>watchManaggerName</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watchManagerName</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
+                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
+                config is used to define which watcher manager to be used. Currently, we only support WatchManager and
+                WatchManagerOptimized.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanThreadsNum</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
+                many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
+                default value is 2, which is good enough even for heavy and continuous session closing/recreating cases.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanThreshold</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
+                heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide
+                the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up
+                speed issue.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanIntervalInSeconds</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
+                heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold,
+                this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger
+                than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting
+                is 10 minutes, which usually don't need to be changed.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>maxInProcessingDeadWatchers</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used
+                to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will
+                slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing
+                watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like
+                watcherCleanThreshold * 1000.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>bitHashCacheSize</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
+
+              <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the
+                setting used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we
+                need to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to
+                keep the size small to make sure it doesn't cost too much in memory, there is a trade off between memory
+                and time complexity. The default value is 10, which seems a relatively reasonable cache size.</para>
+            </listitem>
+          </varlistentry>
+
         </variablelist>
       </section>
 


[2/2] zookeeper git commit: ZOOKEEPER-1177: Add the memory optimized watch manager for concentrate watches scenario

Posted by ha...@apache.org.
ZOOKEEPER-1177: Add the memory optimized watch manager for concentrate watches scenario

The current HashSet based WatcherManager will consume more than 40GB memory when
creating 300M watches.

This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result.

Changed made compared to WatchManager:
* Only keep path to watches map
* Use BitSet to save the memory used to store watches
* Use ConcurrentHashMap and ReadWriteLock instead of synchronized to reduce lock retention
* Lazily clean up the closed watchers

Author: Fangmin Lyu <al...@fb.com>

Reviewers: Andor Molnár <an...@apache.org>, Norbert Kalmar <nk...@yahoo.com>, Michael Han <ha...@apache.org>

Closes #590 from lvfangmin/ZOOKEEPER-1177


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/fdde8b00
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/fdde8b00
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/fdde8b00

Branch: refs/heads/master
Commit: fdde8b006458f7b989c894af0eac7e124d271a1e
Parents: 4ebb847
Author: Fangmin Lyu <al...@fb.com>
Authored: Fri Sep 28 14:38:24 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Fri Sep 28 14:38:24 2018 -0700

----------------------------------------------------------------------
 build.xml                                       |  12 +-
 ivy.xml                                         |   3 +
 .../org/apache/zookeeper/server/DataTree.java   |  25 +-
 .../apache/zookeeper/server/DumbWatcher.java    | 100 +++++
 .../apache/zookeeper/server/NIOServerCnxn.java  |   1 +
 .../zookeeper/server/NettyServerCnxn.java       |   1 +
 .../org/apache/zookeeper/server/ServerCnxn.java |  16 +-
 .../apache/zookeeper/server/WatchManager.java   | 266 ------------
 .../zookeeper/server/WatchesPathReport.java     |  83 ----
 .../apache/zookeeper/server/WatchesReport.java  |  83 ----
 .../apache/zookeeper/server/WatchesSummary.java |  98 -----
 .../org/apache/zookeeper/server/ZKDatabase.java |   1 +
 .../zookeeper/server/util/BitHashSet.java       | 159 ++++++++
 .../apache/zookeeper/server/util/BitMap.java    | 136 +++++++
 .../server/watch/IDeadWatcherListener.java      |  34 ++
 .../zookeeper/server/watch/IWatchManager.java   | 134 ++++++
 .../zookeeper/server/watch/WatchManager.java    | 247 ++++++++++++
 .../server/watch/WatchManagerFactory.java       |  52 +++
 .../server/watch/WatchManagerOptimized.java     | 389 ++++++++++++++++++
 .../zookeeper/server/watch/WatcherCleaner.java  | 182 +++++++++
 .../zookeeper/server/watch/WatcherOrBitSet.java |  61 +++
 .../server/watch/WatchesPathReport.java         |  83 ++++
 .../zookeeper/server/watch/WatchesReport.java   |  83 ++++
 .../zookeeper/server/watch/WatchesSummary.java  |  98 +++++
 src/java/test/config/findbugsExcludeFile.xml    |  38 +-
 .../zookeeper/server/WatchesPathReportTest.java |  60 ---
 .../zookeeper/server/WatchesReportTest.java     |  60 ---
 .../zookeeper/server/WatchesSummaryTest.java    |  42 --
 .../zookeeper/server/util/BitHashSetTest.java   | 110 +++++
 .../zookeeper/server/util/BitMapTest.java       |  71 ++++
 .../server/watch/WatchManagerTest.java          | 404 +++++++++++++++++++
 .../server/watch/WatcherCleanerTest.java        | 127 ++++++
 .../server/watch/WatcherOrBitSetTest.java       |  61 +++
 .../server/watch/WatchesPathReportTest.java     |  60 +++
 .../server/watch/WatchesReportTest.java         |  60 +++
 .../server/watch/WatchesSummaryTest.java        |  42 ++
 .../bench/org/apache/zookeeper/BenchMain.java   |  30 ++
 .../zookeeper/server/watch/WatchBench.java      | 300 ++++++++++++++
 .../src/main/resources/mainClasses              |   1 +
 .../content/xdocs/zookeeperAdmin.xml            |  96 +++++
 40 files changed, 3207 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 2043e68..6e3b40c 100644
--- a/build.xml
+++ b/build.xml
@@ -91,6 +91,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="src.dir" value="${basedir}/src" />
     <property name="java.src.dir" value="${src.dir}/java/main" />
     <property name="jute.src.dir" value="${basedir}/zookeeper-jute/src/main/java" />
+    <property name="java.test.dir" value="${src.dir}/test/java"/>
 
     <property name="lib.dir" value="${src.dir}/java/lib" />
     <property name="lib.dir.includes" value="**/*.jar" />
@@ -121,6 +122,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="test.src.dir" value="${src.dir}/java/test"/>
     <property name="jute.test.src.dir" value="${basedir}/zookeeper-jute/src/test/java" />
     <property name="systest.src.dir" value="${src.dir}/java/systest"/>
+    <property name="bench.src.dir" value="${java.test.dir}/bench"/>
     <property name="test.log.dir" value="${test.java.build.dir}/logs" />
     <property name="test.data.dir" value="${test.java.build.dir}/data" />
     <property name="test.data.invalid.dir" value="${test.data.dir}/invalidsnap" />
@@ -234,6 +236,8 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
 
     <property name="hamcrest.version" value="1.3"/>
 
+    <property name="jmh.version" value="1.19"/>
+
     <!-- ====================================================== -->
     <!-- Macro definitions                                      -->
     <!-- ====================================================== -->
@@ -510,6 +514,10 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
              target="${javac.target}" source="${javac.source}" debug="on" encoding="${build.encoding}">
         <classpath refid="test.java.classpath"/>
       </javac>
+      <javac srcdir="${bench.src.dir}" destdir="${test.java.classes}" includeantruntime="false"
+             target="${javac.target}" source="${javac.source}" debug="on" encoding="${build.encoding}">
+          <classpath refid="test.java.classpath"/>
+      </javac>
     </target>
 
     <target name="compile-native" depends="compile_jute" description="Make C binding">
@@ -1967,7 +1975,9 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
                    output="${build.dir.eclipse-test-classes}" />
            <source path="${systest.src.dir}"
                    output="${build.dir.eclipse-test-classes}" />
-
+           <source path="${bench.src.dir}"
+                   output="${build.dir.eclipse-test-classes}" />
+                   
            <output path="${build.dir.eclipse-main-classes}" />
            <library pathref="default.path.id" exported="true" />
            <library pathref="junit.path.id" exported="false" />

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 663216e..fb09ed3 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -142,6 +142,9 @@
     <dependency org="org.hamcrest" name="hamcrest-all" rev="${hamcrest.version}"
                   conf="test->default" />
 
+    <dependency org="org.openjdk.jmh" name="jmh-core" rev="${jmh.version}" conf="test->default"/>
+    <dependency org="org.openjdk.jmh" name="jmh-generator-annprocess" rev="${jmh.version}" conf="test->default"/>
+
     <conflict manager="strict"/>
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/DataTree.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java
index ac91c10..c24396a 100644
--- a/src/java/main/org/apache/zookeeper/server/DataTree.java
+++ b/src/java/main/org/apache/zookeeper/server/DataTree.java
@@ -39,6 +39,12 @@ import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherOrBitSet;
+import org.apache.zookeeper.server.watch.WatchesPathReport;
+import org.apache.zookeeper.server.watch.WatchesReport;
+import org.apache.zookeeper.server.watch.WatchesSummary;
 import org.apache.zookeeper.txn.CheckVersionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -87,9 +93,9 @@ public class DataTree {
     private final ConcurrentHashMap<String, DataNode> nodes =
         new ConcurrentHashMap<String, DataNode>();
 
-    private final WatchManager dataWatches = new WatchManager();
+    private IWatchManager dataWatches;
 
-    private final WatchManager childWatches = new WatchManager();
+    private IWatchManager childWatches;
 
     /** cached total size of paths and data for all DataNodes */
     private final AtomicLong nodeDataSize = new AtomicLong(0);
@@ -253,6 +259,14 @@ public class DataTree {
         addConfigNode();
 
         nodeDataSize.set(approximateDataSize());
+        try {
+            dataWatches = WatchManagerFactory.createWatchManager();
+            childWatches = WatchManagerFactory.createWatchManager();
+        } catch (Exception e) {
+            LOG.error("Unexpected exception when creating WatchManager, " +
+                    "exiting abnormally", e);
+            System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
     }
 
     /**
@@ -611,7 +625,7 @@ public class DataTree {
             ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                     "childWatches.triggerWatch " + parentName);
         }
-        Set<Watcher> processed = dataWatches.triggerWatch(path,
+        WatcherOrBitSet processed = dataWatches.triggerWatch(path,
                 EventType.NodeDeleted);
         childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
         childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
@@ -1361,6 +1375,11 @@ public class DataTree {
         }
     }
 
+    public void shutdownWatcher() {
+        dataWatches.shutdown();
+        childWatches.shutdown();
+    }
+
     /**
      * Returns a mapping of session ID to ephemeral znodes.
      *

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/DumbWatcher.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/DumbWatcher.java b/src/java/main/org/apache/zookeeper/server/DumbWatcher.java
new file mode 100644
index 0000000..ff17181
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/DumbWatcher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.cert.Certificate;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+
+/**
+ * A empty watcher implementation used in bench and unit test.
+ */
+public class DumbWatcher extends ServerCnxn {
+
+    private long sessionId;
+
+    public DumbWatcher() {
+        this(0);
+    }
+
+    public DumbWatcher(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    void setSessionTimeout(int sessionTimeout) { }
+
+    @Override
+    public void process(WatchedEvent event) { }
+
+    @Override
+    int getSessionTimeout() { return 0; }
+
+    @Override
+    void close() { }
+
+    @Override
+    public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { }
+
+    @Override
+    public void sendCloseSession() { }
+
+    @Override
+    public long getSessionId() { return sessionId; }
+
+    @Override
+    void setSessionId(long sessionId) { }
+
+    @Override
+    void sendBuffer(ByteBuffer closeConn) { }
+
+    @Override
+    void enableRecv() { }
+
+    @Override
+    void disableRecv() { }
+
+    @Override
+    protected ServerStats serverStats() { return null; }
+
+    @Override
+    public long getOutstandingRequests() { return 0; }
+
+    @Override
+    public InetSocketAddress getRemoteSocketAddress() { return null; }
+
+    @Override
+    public int getInterestOps() { return 0; }
+
+    @Override
+    public boolean isSecure() { return false; }
+
+    @Override
+    public Certificate[] getClientCertificateChain() { return null; }
+
+    @Override
+    public void setClientCertificateChain(Certificate[] chain) { }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
index fffb775..c344c65 100644
--- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -594,6 +594,7 @@ public class NIOServerCnxn extends ServerCnxn {
      */
     @Override
     public void close() {
+        setStale();
         if (!factory.removeCnxn(this)) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 88aa593..0b27724 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -92,6 +92,7 @@ public class NettyServerCnxn extends ServerCnxn {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
         }
+        setStale();
 
         // ZOOKEEPER-2743:
         // Always unregister connection upon close to prevent

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
index 917516a..0822f19 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
@@ -54,7 +54,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     // (aka owned by) this class
     final public static Object me = new Object();
     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class);
-    
+
     private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());
 
     private static final byte[] fourBytes = new byte[4];
@@ -66,6 +66,8 @@ public abstract class ServerCnxn implements Stats, Watcher {
      */
     boolean isOldClient = true;
 
+    private volatile boolean stale = false;
+
     abstract int getSessionTimeout();
 
     abstract void close();
@@ -143,6 +145,14 @@ public abstract class ServerCnxn implements Stats, Watcher {
         }
     }
 
+    public boolean isStale() {
+        return stale;
+    }
+
+    public void setStale() {
+        stale = true;
+    }
+
     protected void packetReceived(long bytes) {
         incrPacketsReceived();
         ServerStats serverStats = serverStats();
@@ -196,7 +206,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     protected long incrPacketsReceived() {
         return packetsReceived.incrementAndGet();
     }
-    
+
     protected void incrOutstandingRequests(RequestHeader h) {
     }
 
@@ -293,7 +303,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     public abstract boolean isSecure();
     public abstract Certificate[] getClientCertificateChain();
     public abstract void setClientCertificateChain(Certificate[] chain);
-    
+
     /**
      * Print information about the connection.
      * @param brief iff true prints brief details, otw full detail

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchManager.java b/src/java/main/org/apache/zookeeper/server/WatchManager.java
deleted file mode 100644
index 076f645..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchManager.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class manages watches. It allows watches to be associated with a string
- * and removes watchers and their watches in addition to managing triggers.
- */
-class WatchManager {
-    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
-
-    private final Map<String, Set<Watcher>> watchTable =
-        new HashMap<String, Set<Watcher>>();
-
-    private final Map<Watcher, Set<String>> watch2Paths =
-        new HashMap<Watcher, Set<String>>();
-
-    synchronized int size(){
-        int result = 0;
-        for(Set<Watcher> watches : watchTable.values()) {
-            result += watches.size();
-        }
-        return result;
-    }
-
-    synchronized void addWatch(String path, Watcher watcher) {
-        Set<Watcher> list = watchTable.get(path);
-        if (list == null) {
-            // don't waste memory if there are few watches on a node
-            // rehash when the 4th entry is added, doubling size thereafter
-            // seems like a good compromise
-            list = new HashSet<Watcher>(4);
-            watchTable.put(path, list);
-        }
-        list.add(watcher);
-
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null) {
-            // cnxns typically have many watches, so use default cap here
-            paths = new HashSet<String>();
-            watch2Paths.put(watcher, paths);
-        }
-        paths.add(path);
-    }
-
-    synchronized void removeWatcher(Watcher watcher) {
-        Set<String> paths = watch2Paths.remove(watcher);
-        if (paths == null) {
-            return;
-        }
-        for (String p : paths) {
-            Set<Watcher> list = watchTable.get(p);
-            if (list != null) {
-                list.remove(watcher);
-                if (list.size() == 0) {
-                    watchTable.remove(p);
-                }
-            }
-        }
-    }
-
-    Set<Watcher> triggerWatch(String path, EventType type) {
-        return triggerWatch(path, type, null);
-    }
-
-    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
-        WatchedEvent e = new WatchedEvent(type,
-                KeeperState.SyncConnected, path);
-        Set<Watcher> watchers;
-        synchronized (this) {
-            watchers = watchTable.remove(path);
-            if (watchers == null || watchers.isEmpty()) {
-                if (LOG.isTraceEnabled()) {
-                    ZooTrace.logTraceMessage(LOG,
-                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
-                            "No watchers for " + path);
-                }
-                return null;
-            }
-            for (Watcher w : watchers) {
-                Set<String> paths = watch2Paths.get(w);
-                if (paths != null) {
-                    paths.remove(path);
-                }
-            }
-        }
-        for (Watcher w : watchers) {
-            if (supress != null && supress.contains(w)) {
-                continue;
-            }
-            w.process(e);
-        }
-        return watchers;
-    }
-
-    /**
-     * Brief description of this object.
-     */
-    @Override
-    public synchronized String toString() {
-        StringBuilder sb = new StringBuilder();
-
-        sb.append(watch2Paths.size()).append(" connections watching ")
-            .append(watchTable.size()).append(" paths\n");
-
-        int total = 0;
-        for (Set<String> paths : watch2Paths.values()) {
-            total += paths.size();
-        }
-        sb.append("Total watches:").append(total);
-
-        return sb.toString();
-    }
-
-    /**
-     * String representation of watches. Warning, may be large!
-     * @param byPath iff true output watches by paths, otw output
-     * watches by connection
-     * @return string representation of watches
-     */
-    synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
-        if (byPath) {
-            for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
-                pwriter.println(e.getKey());
-                for (Watcher w : e.getValue()) {
-                    pwriter.print("\t0x");
-                    pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
-                    pwriter.print("\n");
-                }
-            }
-        } else {
-            for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
-                pwriter.print("0x");
-                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
-                for (String path : e.getValue()) {
-                    pwriter.print("\t");
-                    pwriter.println(path);
-                }
-            }
-        }
-    }
-
-    /**
-     * Checks the specified watcher exists for the given path
-     *
-     * @param path
-     *            znode path
-     * @param watcher
-     *            watcher object reference
-     * @return true if the watcher exists, false otherwise
-     */
-    synchronized boolean containsWatcher(String path, Watcher watcher) {
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null || !paths.contains(path)) {
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * Removes the specified watcher for the given path
-     *
-     * @param path
-     *            znode path
-     * @param watcher
-     *            watcher object reference
-     * @return true if the watcher successfully removed, false otherwise
-     */
-    synchronized boolean removeWatcher(String path, Watcher watcher) {
-        Set<String> paths = watch2Paths.get(watcher);
-        if (paths == null || !paths.remove(path)) {
-            return false;
-        }
-
-        Set<Watcher> list = watchTable.get(path);
-        if (list == null || !list.remove(watcher)) {
-            return false;
-        }
-
-        if (list.size() == 0) {
-            watchTable.remove(path);
-        }
-
-        return true;
-    }
-
-    /**
-     * Returns a watch report.
-     *
-     * @return watch report
-     * @see WatchesReport
-     */
-    synchronized WatchesReport getWatches() {
-        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
-        for (Entry<Watcher, Set<String>> e: watch2Paths.entrySet()) {
-            Long id = ((ServerCnxn) e.getKey()).getSessionId();
-            Set<String> paths = new HashSet<String>(e.getValue());
-            id2paths.put(id, paths);
-        }
-        return new WatchesReport(id2paths);
-    }
-
-    /**
-     * Returns a watch report by path.
-     *
-     * @return watch report
-     * @see WatchesPathReport
-     */
-    synchronized WatchesPathReport getWatchesByPath() {
-        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
-        for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
-            Set<Long> ids = new HashSet<Long>(e.getValue().size());
-            path2ids.put(e.getKey(), ids);
-            for (Watcher watcher : e.getValue()) {
-                ids.add(((ServerCnxn) watcher).getSessionId());
-            }
-        }
-        return new WatchesPathReport(path2ids);
-    }
-
-    /**
-     * Returns a watch summary.
-     *
-     * @return watch summary
-     * @see WatchesSummary
-     */
-    synchronized WatchesSummary getWatchesSummary() {
-        int totalWatches = 0;
-        for (Set<String> paths : watch2Paths.values()) {
-            totalWatches += paths.size();
-        }
-        return new WatchesSummary (watch2Paths.size(), watchTable.size(),
-                                   totalWatches);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java b/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java
deleted file mode 100644
index 6792ac9..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchesPathReport.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A watch report, essentially a mapping of path to session IDs of sessions that
- * have set a watch on that path. This class is immutable.
- */
-public class WatchesPathReport {
-
-    private final Map<String, Set<Long>> path2Ids;
-
-    /**
-     * Creates a new report.
-     *
-     * @param path2Ids map of paths to session IDs of sessions that have set a
-     * watch on that path
-     */
-    WatchesPathReport(Map<String, Set<Long>> path2Ids) {
-        this.path2Ids = Collections.unmodifiableMap(deepCopy(path2Ids));
-    }
-
-    private static Map<String, Set<Long>> deepCopy(Map<String, Set<Long>> m) {
-        Map<String, Set<Long>> m2 = new HashMap<String, Set<Long>>();
-        for (Map.Entry<String, Set<Long>> e : m.entrySet()) {
-            m2.put(e.getKey(), new HashSet<Long>(e.getValue()));
-        }
-        return m2;
-    }
-
-    /**
-     * Checks if the given path has watches set.
-     *
-     * @param path path
-     * @return true if path has watch set
-     */
-    public boolean hasSessions(String path) {
-        return path2Ids.containsKey(path);
-    }
-    /**
-     * Gets the session IDs of sessions that have set watches on the given path.
-     * The returned set is immutable.
-     *
-     * @param path session ID
-     * @return session IDs of sessions that have set watches on the path, or
-     * null if none
-     */
-    public Set<Long> getSessions(String path) {
-        Set<Long> s = path2Ids.get(path);
-        return s != null ? Collections.unmodifiableSet(s) : null;
-    }
-
-    /**
-     * Converts this report to a map. The returned map is mutable, and changes
-     * to it do not reflect back into this report.
-     *
-     * @return map representation of report
-     */
-    public Map<String, Set<Long>> toMap() {
-        return deepCopy(path2Ids);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchesReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchesReport.java b/src/java/main/org/apache/zookeeper/server/WatchesReport.java
deleted file mode 100644
index e4c6dc2..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchesReport.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A watch report, essentially a mapping of session ID to paths that the session
- * has set a watch on. This class is immutable.
- */
-public class WatchesReport {
-
-    private final Map<Long, Set<String>> id2paths;
-
-    /**
-     * Creates a new report.
-     *
-     * @param id2paths map of session IDs to paths that each session has set
-     * a watch on
-     */
-    WatchesReport(Map<Long, Set<String>> id2paths) {
-        this.id2paths = Collections.unmodifiableMap(deepCopy(id2paths));
-    }
-
-    private static Map<Long, Set<String>> deepCopy(Map<Long, Set<String>> m) {
-        Map<Long, Set<String>> m2 = new HashMap<Long, Set<String>>();
-        for (Map.Entry<Long, Set<String>> e : m.entrySet()) {
-            m2.put(e.getKey(), new HashSet<String>(e.getValue()));
-        }
-        return m2;
-    }
-
-    /**
-     * Checks if the given session has watches set.
-     *
-     * @param sessionId session ID
-     * @return true if session has paths with watches set
-     */
-    public boolean hasPaths(long sessionId) {
-        return id2paths.containsKey(sessionId);
-    }
-
-    /**
-     * Gets the paths that the given session has set watches on. The returned
-     * set is immutable.
-     *
-     * @param sessionId session ID
-     * @return paths that have watches set by the session, or null if none
-     */
-    public Set<String> getPaths(long sessionId) {
-        Set<String> s = id2paths.get(sessionId);
-        return s != null ? Collections.unmodifiableSet(s) : null;
-    }
-
-    /**
-     * Converts this report to a map. The returned map is mutable, and changes
-     * to it do not reflect back into this report.
-     *
-     * @return map representation of report
-     */
-    public Map<Long, Set<String>> toMap() {
-        return deepCopy(id2paths);
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/WatchesSummary.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/WatchesSummary.java b/src/java/main/org/apache/zookeeper/server/WatchesSummary.java
deleted file mode 100644
index 2053b55..0000000
--- a/src/java/main/org/apache/zookeeper/server/WatchesSummary.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A summary of watch information. This class is immutable.
- */
-public class WatchesSummary {
-
-    /**
-     * The key in the map returned by {@link #toMap()} for the number of
-     * connections.
-     */
-    public static final String KEY_NUM_CONNECTIONS = "num_connections";
-    /**
-     * The key in the map returned by {@link #toMap()} for the number of paths.
-     */
-    public static final String KEY_NUM_PATHS = "num_paths";
-    /**
-     * The key in the map returned by {@link #toMap()} for the total number of
-     * watches.
-     */
-    public static final String KEY_NUM_TOTAL_WATCHES = "num_total_watches";
-
-    private final int numConnections;
-    private final int numPaths;
-    private final int totalWatches;
-
-    /**
-     * Creates a new summary.
-     *
-     * @param numConnections the number of sessions that have set watches
-     * @param numPaths the number of paths that have watches set on them
-     * @param totalWatches the total number of watches set
-     */
-    WatchesSummary(int numConnections, int numPaths, int totalWatches) {
-        this.numConnections = numConnections;
-        this.numPaths = numPaths;
-        this.totalWatches = totalWatches;
-    }
-
-    /**
-     * Gets the number of connections (sessions) that have set watches.
-     *
-     * @return number of connections
-     */
-    public int getNumConnections() {
-        return numConnections;
-    }
-    /**
-     * Gets the number of paths that have watches set on them.
-     *
-     * @return number of paths
-     */
-    public int getNumPaths() {
-        return numPaths;
-    }
-    /**
-     * Gets the total number of watches set.
-     *
-     * @return total watches
-     */
-    public int getTotalWatches() {
-        return totalWatches;
-    }
-
-    /**
-     * Converts this summary to a map. The returned map is mutable, and changes
-     * to it do not reflect back into this summary.
-     *
-     * @return map representation of summary
-     */
-    public Map<String, Object> toMap() {
-        Map<String, Object> summary = new LinkedHashMap<String, Object>();
-        summary.put(KEY_NUM_CONNECTIONS, numConnections);
-        summary.put(KEY_NUM_PATHS, numPaths);
-        summary.put(KEY_NUM_TOTAL_WATCHES, totalWatches);
-        return summary;
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 86e2c09..753461a 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -138,6 +138,7 @@ public class ZKDatabase {
         /* to be safe we just create a new
          * datatree.
          */
+        dataTree.shutdownWatcher();
         dataTree = createDataTree();
         sessionsWithTimeouts.clear();
         WriteLock lock = logLock.writeLock();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java b/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
new file mode 100644
index 0000000..b60f1d4
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.BitSet;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.lang.Iterable;
+
+/**
+ * Using BitSet to store all the elements, and use HashSet to cache limited
+ * number of elements to find a balance between memory and time complexity.
+ *
+ * Without HashSet, we need to use O(N) time to get the elements, N is
+ * the bit numbers in elementBits. But we need to keep the size small to make
+ * sure it doesn't cost too much in memory, there is a trade off between
+ * memory and time complexity.
+ *
+ * Previously, was deciding to dynamically switch between SparseBitSet and
+ * HashSet based on the memory consumption, but it will take time to copy
+ * data over and may have some herd effect of keep copying data from one
+ * data structure to anther. The current solution can do a very good job
+ * given most of the paths have limited number of elements.
+ */
+public class BitHashSet implements Iterable<Integer> {
+
+    /**
+     * Change to SparseBitSet if we we want to optimize more, the number of
+     * elements on a single server is usually limited, so BitSet should be
+     * fine.
+     */
+    private final BitSet elementBits = new BitSet();
+
+    /**
+     * HashSet is used to optimize the iterating, if there is a single 
+     * element in this BitHashSet, but the bit is very large, without 
+     * HashSet we need to go through all the words before return that 
+     * element, which is not efficient.
+     */
+    private final Set<Integer> cache = new HashSet<Integer>();
+
+    private final int cacheSize;
+
+    // To record how many elements in this set.
+    private int elementCount = 0;
+
+    public BitHashSet() {
+        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
+    }
+
+    public BitHashSet(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public synchronized boolean add(Integer elementBit) {
+        if (elementBit == null || elementBits.get(elementBit)) {
+            return false;
+        }
+        if (cache.size() < cacheSize) {
+            cache.add(elementBit);
+        }
+        elementBits.set(elementBit);
+        elementCount++;
+        return true;
+    }
+
+    /**
+     * Remove the watches, and return the number of watches being removed.
+     */
+    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
+        cache.removeAll(bitSet);
+        elementBits.andNot(bits);
+        int elementCountBefore = elementCount;
+        elementCount = elementBits.cardinality();
+        return elementCountBefore - elementCount;
+    }
+
+    public synchronized boolean remove(Integer elementBit) {
+        if (elementBit == null || !elementBits.get(elementBit)) {
+            return false;
+        }
+
+        cache.remove(elementBit);
+        elementBits.clear(elementBit);
+        elementCount--;
+        return true;
+    }
+
+    public synchronized boolean contains(Integer elementBit) {
+        if (elementBit == null) {
+            return false;
+        }
+        return elementBits.get(elementBit);
+    }
+
+    public synchronized int size() {
+        return elementCount;
+    }
+
+    /**
+     * This function is not thread-safe, need to synchronized when
+     * iterate through this set.
+     */
+    @Override
+    public Iterator<Integer> iterator() {
+        if (cache.size() == elementCount) {
+            return cache.iterator();
+        }
+
+        return new Iterator<Integer>() {
+            int returnedCount = 0;
+            int bitIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return returnedCount < elementCount;
+            }
+
+            @Override
+            public Integer next() {
+                int bit = elementBits.nextSetBit(bitIndex);
+                bitIndex = bit + 1;
+                returnedCount++;
+                return bit;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    // visible for test
+    public synchronized int cachedSize() {
+        return cache.size();
+    }
+
+    public synchronized boolean isEmpty() {
+        return elementCount == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/util/BitMap.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/util/BitMap.java b/src/java/main/org/apache/zookeeper/server/util/BitMap.java
new file mode 100644
index 0000000..1a0fb3b
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/util/BitMap.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is a helper class to maintain the bit to specific value and the
+ * reversed value to bit mapping.
+ */
+public class BitMap<T> {
+
+    private final Map<T, Integer> value2Bit = new HashMap<T, Integer>();
+    private final Map<Integer, T> bit2Value = new HashMap<Integer, T>();
+
+    private final BitSet freedBitSet = new BitSet();
+    private Integer nextBit = Integer.valueOf(0);
+
+    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    public Integer add(T value) {
+        /*
+         * Optimized for code which will add the same value again and again,
+         * more specifically this is used to add new bit for watcher, and
+         * the same watcher may watching thousands or even millions of nodes,
+         * which will call add the same value of this function, check exist
+         * using read lock will optimize the performance here.
+         */
+        Integer bit = getBit(value);
+        if (bit != null) {
+            return bit;
+        }
+
+        rwLock.writeLock().lock();
+        try {
+            bit = value2Bit.get(value);
+            if (bit != null) {
+                return bit;
+            }
+            bit = freedBitSet.nextSetBit(0);
+            if (bit > -1) {
+                freedBitSet.clear(bit);
+            } else {
+                bit = nextBit++;
+            }
+
+            value2Bit.put(value, bit);
+            bit2Value.put(bit, value);
+            return bit;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public T get(int bit) {
+        rwLock.readLock().lock();
+        try {
+            return bit2Value.get(bit);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public Integer getBit(T value) {
+        rwLock.readLock().lock();
+        try {
+            return value2Bit.get(value);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public int remove(T value) {
+        /*
+         * remove only called once when the session is closed, so use write 
+         * lock directly without checking read lock.
+         */
+        rwLock.writeLock().lock();
+        try {
+            Integer bit = value2Bit.get(value);
+            if (bit == null) {
+                return -1;
+            }
+            value2Bit.remove(value);
+            bit2Value.remove(bit);
+            freedBitSet.set(bit);
+            return bit;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public T remove(int bit) {
+        rwLock.writeLock().lock();
+        try {
+            T value = bit2Value.get(bit);
+            if (value == null) {
+                return null;
+            }
+            value2Bit.remove(value);
+            bit2Value.remove(bit);
+            freedBitSet.set(bit);
+            return value;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public int size() {
+        rwLock.readLock().lock();
+        try {
+            return value2Bit.size();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java b/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
new file mode 100644
index 0000000..7de6772
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+
+/**
+ * Interface used to process the dead watchers related to closed cnxns.
+ */
+public interface IDeadWatcherListener {
+
+    /**
+     * Process the given dead watchers.
+     *
+     * @param deadWatchers the watchers which have closed cnxn
+     */
+    public void processDeadWatchers(Set<Integer> deadWatchers);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java b/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
new file mode 100644
index 0000000..0c18e6a
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public interface IWatchManager {
+
+    /**
+     * Add watch to specific path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher added is not already present
+     */
+    public boolean addWatch(String path, Watcher watcher);
+
+    /**
+     * Checks the specified watcher exists for the given path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher exists, false otherwise
+     */
+    public boolean containsWatcher(String path, Watcher watcher);
+
+    /**
+     * Removes the specified watcher for the given path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher successfully removed, false otherwise
+     */
+    public boolean removeWatcher(String path, Watcher watcher);
+
+    /**
+     * The entry to remove the watcher when the cnxn is closed.
+     *
+     * @param watcher watcher object reference
+     */
+    public void removeWatcher(Watcher watcher);
+
+    /**
+     * Distribute the watch event for the given path.
+     *
+     * @param path znode path
+     * @param type the watch event type
+     *
+     * @return the watchers have been notified
+     */
+    public WatcherOrBitSet triggerWatch(String path, EventType type);
+
+    /**
+     * Distribute the watch event for the given path, but ignore those
+     * suppressed ones.
+     *
+     * @param path znode path
+     * @param type the watch event type
+     * @param suppress the suppressed watcher set
+     *
+     * @return the watchers have been notified
+     */
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet suppress);
+
+    /**
+     * Get the size of watchers.
+     *
+     * @return the watchers number managed in this class.
+     */
+    public int size();
+
+    /**
+     * Clean up the watch manager.
+     */
+    public void shutdown();
+
+    /**
+     * Returns a watch summary.
+     *
+     * @return watch summary
+     * @see WatchesSummary
+     */
+    public WatchesSummary getWatchesSummary();
+
+    /**
+     * Returns a watch report.
+     *
+     * @return watch report
+     * @see WatchesReport
+     */
+    public WatchesReport getWatches();
+
+    /**
+     * Returns a watch report by path.
+     *
+     * @return watch report
+     * @see WatchesPathReport
+     */
+    public WatchesPathReport getWatchesByPath();
+
+    /**
+     * String representation of watches. Warning, may be large!
+     *
+     * @param pwriter the writer to dump the watches
+     * @param byPath iff true output watches by paths, otw output
+     * watches by connection
+     *
+     * @return string representation of watches
+     */
+    public void dumpWatches(PrintWriter pwriter, boolean byPath);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java b/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
new file mode 100644
index 0000000..3e14f6e
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooTrace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages watches. It allows watches to be associated with a string
+ * and removes watchers and their watches in addition to managing triggers.
+ */
+public class WatchManager implements IWatchManager {
+    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
+
+    private final Map<String, Set<Watcher>> watchTable =
+        new HashMap<String, Set<Watcher>>();
+
+    private final Map<Watcher, Set<String>> watch2Paths =
+        new HashMap<Watcher, Set<String>>();
+
+    @Override
+    public synchronized int size(){
+        int result = 0;
+        for(Set<Watcher> watches : watchTable.values()) {
+            result += watches.size();
+        }
+        return result;
+    }
+
+    boolean isDeadWatcher(Watcher watcher) {
+        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
+    }
+
+    @Override
+    public synchronized boolean addWatch(String path, Watcher watcher) {
+        if (isDeadWatcher(watcher)) {
+            LOG.debug("Ignoring addWatch with closed cnxn");
+            return false;
+        }
+
+        Set<Watcher> list = watchTable.get(path);
+        if (list == null) {
+            // don't waste memory if there are few watches on a node
+            // rehash when the 4th entry is added, doubling size thereafter
+            // seems like a good compromise
+            list = new HashSet<Watcher>(4);
+            watchTable.put(path, list);
+        }
+        list.add(watcher);
+
+        Set<String> paths = watch2Paths.get(watcher);
+        if (paths == null) {
+            // cnxns typically have many watches, so use default cap here
+            paths = new HashSet<String>();
+            watch2Paths.put(watcher, paths);
+        }
+        return paths.add(path);
+    }
+
+    @Override
+    public synchronized void removeWatcher(Watcher watcher) {
+        Set<String> paths = watch2Paths.remove(watcher);
+        if (paths == null) {
+            return;
+        }
+        for (String p : paths) {
+            Set<Watcher> list = watchTable.get(p);
+            if (list != null) {
+                list.remove(watcher);
+                if (list.isEmpty()) {
+                    watchTable.remove(p);
+                }
+            }
+        }
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(String path, EventType type) {
+        return triggerWatch(path, type, null);
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet supress) {
+        WatchedEvent e = new WatchedEvent(type,
+                KeeperState.SyncConnected, path);
+        Set<Watcher> watchers;
+        synchronized (this) {
+            watchers = watchTable.remove(path);
+            if (watchers == null || watchers.isEmpty()) {
+                if (LOG.isTraceEnabled()) {
+                    ZooTrace.logTraceMessage(LOG,
+                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
+                            "No watchers for " + path);
+                }
+                return null;
+            }
+            for (Watcher w : watchers) {
+                Set<String> paths = watch2Paths.get(w);
+                if (paths != null) {
+                    paths.remove(path);
+                }
+            }
+        }
+        for (Watcher w : watchers) {
+            if (supress != null && supress.contains(w)) {
+                continue;
+            }
+            w.process(e);
+        }
+        return new WatcherOrBitSet(watchers);
+    }
+
+    @Override
+    public synchronized String toString() {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(watch2Paths.size()).append(" connections watching ")
+            .append(watchTable.size()).append(" paths\n");
+
+        int total = 0;
+        for (Set<String> paths : watch2Paths.values()) {
+            total += paths.size();
+        }
+        sb.append("Total watches:").append(total);
+
+        return sb.toString();
+    }
+
+    @Override
+    public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
+        if (byPath) {
+            for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
+                pwriter.println(e.getKey());
+                for (Watcher w : e.getValue()) {
+                    pwriter.print("\t0x");
+                    pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
+                    pwriter.print("\n");
+                }
+            }
+        } else {
+            for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
+                pwriter.print("0x");
+                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
+                for (String path : e.getValue()) {
+                    pwriter.print("\t");
+                    pwriter.println(path);
+                }
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean containsWatcher(String path, Watcher watcher) {
+        Set<String> paths = watch2Paths.get(watcher);
+        if (paths == null || !paths.contains(path)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public synchronized boolean removeWatcher(String path, Watcher watcher) {
+        Set<String> paths = watch2Paths.get(watcher);
+        if (paths == null || !paths.remove(path)) {
+            return false;
+        }
+
+        Set<Watcher> list = watchTable.get(path);
+        if (list == null || !list.remove(watcher)) {
+            return false;
+        }
+
+        if (list.isEmpty()) {
+            watchTable.remove(path);
+        }
+
+        return true;
+    }
+
+    @Override
+    public synchronized WatchesReport getWatches() {
+        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+        for (Entry<Watcher, Set<String>> e: watch2Paths.entrySet()) {
+            Long id = ((ServerCnxn) e.getKey()).getSessionId();
+            Set<String> paths = new HashSet<String>(e.getValue());
+            id2paths.put(id, paths);
+        }
+        return new WatchesReport(id2paths);
+    }
+
+    @Override
+    public synchronized WatchesPathReport getWatchesByPath() {
+        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+        for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
+            Set<Long> ids = new HashSet<Long>(e.getValue().size());
+            path2ids.put(e.getKey(), ids);
+            for (Watcher watcher : e.getValue()) {
+                ids.add(((ServerCnxn) watcher).getSessionId());
+            }
+        }
+        return new WatchesPathReport(path2ids);
+    }
+
+    @Override
+    public synchronized WatchesSummary getWatchesSummary() {
+        int totalWatches = 0;
+        for (Set<String> paths : watch2Paths.values()) {
+            totalWatches += paths.size();
+        }
+        return new WatchesSummary (watch2Paths.size(), watchTable.size(),
+                                   totalWatches);
+    }
+
+    @Override
+    public void shutdown() { /* do nothing */ }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
new file mode 100644
index 0000000..5f8834e
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A factory used to produce the actual watch manager based on the
+ * zookeeper.watchManagerName option.
+ */
+public class WatchManagerFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(WatchManagerFactory.class);
+
+    public static final String ZOOKEEPER_WATCH_MANAGER_NAME = "zookeeper.watchManagerName";
+
+    public static IWatchManager createWatchManager() throws IOException {
+        String watchManagerName = System.getProperty(ZOOKEEPER_WATCH_MANAGER_NAME);
+        if (watchManagerName == null) {
+            watchManagerName = WatchManager.class.getName();
+        }
+        try {
+            IWatchManager watchManager =
+                    (IWatchManager) Class.forName(watchManagerName).newInstance();
+            LOG.info("Using {} as watch manager", watchManagerName);
+            return watchManager;
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + watchManagerName);
+            ioe.initCause(e);
+            throw ioe;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
new file mode 100644
index 0000000..6abb760
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.util.BitHashSet;
+import org.apache.zookeeper.server.util.BitMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Optimized in memory and time complexity, compared to WatchManager, both the
+ * memory consumption and time complexity improved a lot, but it cannot
+ * efficiently remove the watcher when the session or socket is closed, for
+ * majority use case this is not a problem.
+ *
+ * Changed made compared to WatchManager:
+ *
+ * - Use HashSet and BitSet to store the watchers to find a balance between
+ *   memory usage and time complexity
+ * - Use ReadWriteLock instead of synchronized to reduce lock retention
+ * - Lazily clean up the closed watchers
+ */
+public class WatchManagerOptimized
+        implements IWatchManager, IDeadWatcherListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatchManagerOptimized.class);
+
+    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
+            new ConcurrentHashMap<String, BitHashSet>();
+
+    // watcher to bit id mapping
+    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
+
+    // used to lazily remove the dead watchers
+    private final WatcherCleaner watcherCleaner;
+
+    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
+
+    public WatchManagerOptimized() {
+        watcherCleaner = new WatcherCleaner(this);
+        watcherCleaner.start();
+    }
+
+    @Override
+    public boolean addWatch(String path, Watcher watcher) {
+        boolean result = false;
+        // Need readLock to exclusively lock with removeWatcher, otherwise we 
+        // may add a dead watch whose connection was just closed. 
+        //
+        // Creating new watcher bit and adding it to the BitHashSet has it's 
+        // own lock to minimize the write lock scope
+        addRemovePathRWLock.readLock().lock();
+        try {
+            // avoid race condition of adding a on flying dead watcher
+            if (isDeadWatcher(watcher)) {
+                LOG.debug("Ignoring addWatch with closed cnxn");
+            } else {
+                Integer bit = watcherBitIdMap.add(watcher);
+                BitHashSet watchers = pathWatches.get(path);
+                if (watchers == null) {
+                    watchers = new BitHashSet();
+                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
+                    // it's possible multiple thread might add to pathWatches 
+                    // while we're holding read lock, so we need this check 
+                    // here
+                    if (existingWatchers != null) {
+                        watchers = existingWatchers;
+                    }
+                }
+                result = watchers.add(bit);
+            }
+        } finally {
+            addRemovePathRWLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    /**
+     * Used in the OpCode.checkWatches, which is a read operation, since read
+     * and write requests are exclusively processed, we don't need to hold
+     * lock here. 
+     *
+     * Different from addWatch this method doesn't mutate any state, so we don't
+     * need to hold read lock to avoid dead watcher (cnxn closed) being added 
+     * to the watcher manager. 
+     *
+     * It's possible that before we lazily clean up the dead watcher, this will 
+     * return true, but since the cnxn is closed, the response will dropped as
+     * well, so it doesn't matter.
+     */
+    @Override
+    public boolean containsWatcher(String path, Watcher watcher) {
+        BitHashSet watchers = pathWatches.get(path);
+        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean removeWatcher(String path, Watcher watcher) {
+        // Hold write lock directly because removeWatcher request is more 
+        // likely to be invoked when the watcher is actually exist and 
+        // haven't fired yet, so instead of having read lock to check existence 
+        // before switching to write one, it's actually cheaper to hold write 
+        // lock directly here.
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            BitHashSet list = pathWatches.get(path);
+            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
+                return false;
+            }
+            if (list.isEmpty()) {
+                pathWatches.remove(path);
+            }
+            return true;
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void removeWatcher(Watcher watcher) {
+        Integer watcherBit;
+        // Use exclusive lock with addWatcher to guarantee that we won't add
+        // watch for a cnxn which is already closed. 
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            // do nothing if the watcher is not tracked
+            watcherBit = watcherBitIdMap.getBit(watcher);
+            if (watcherBit == null) {
+                return;
+            }
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+
+        // We can guarantee that when this line is executed, the cnxn of this 
+        // watcher has already been marked as stale (this method is only called 
+        // from ServerCnxn.close after we set stale), which means no watches 
+        // will be added to the watcher manager with this watcher, so that we
+        // can safely clean up this dead watcher. 
+        //
+        // So it's not necessary to have this line in the addRemovePathRWLock. 
+        // And moving the addDeadWatcher out of the locking block to avoid
+        // holding the write lock while we're blocked on adding dead watchers 
+        // into the watcherCleaner.
+        watcherCleaner.addDeadWatcher(watcherBit);
+    }
+
+    /**
+     * Entry for WatcherCleaner to remove dead watchers
+     *
+     * @param deadWatchers the watchers need to be removed
+     */
+    @Override
+    public void processDeadWatchers(Set<Integer> deadWatchers) {
+        // All the watchers being processed here are guaranteed to be dead, 
+        // no watches will be added for those dead watchers, that's why I 
+        // don't need to have addRemovePathRWLock here.
+        BitSet bits = new BitSet();
+        for (int dw: deadWatchers) {
+            bits.set(dw);
+        }
+        // The value iterator will reflect the state when it was
+        // created, don't need to synchronize.
+        for (BitHashSet watchers: pathWatches.values()) {
+            watchers.remove(deadWatchers, bits);
+        }
+        // Better to remove the empty path from pathWatches, but it will add
+        // lot of lock contention and affect the throughput of addWatch,
+        // let's rely on the triggerWatch to delete it.
+        for (Integer wbit: deadWatchers) {
+            watcherBitIdMap.remove(wbit);
+        }
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(String path, EventType type) {
+        return triggerWatch(path, type, null);
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet suppress) {
+        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+
+        BitHashSet watchers = remove(path);
+        if (watchers == null) {
+            return null;
+        }
+
+        // Avoid race condition between dead watcher cleaner in
+        // WatcherCleaner and iterating here
+        synchronized (watchers) {
+            for (Integer wBit : watchers) {
+                if (suppress != null && suppress.contains(wBit)) {
+                    continue;
+                }
+
+                Watcher w = watcherBitIdMap.get(wBit);
+
+                // skip dead watcher
+                if (w == null || isDeadWatcher(w)) {
+                    continue;
+                }
+
+                w.process(e);
+            }
+        }
+
+        return new WatcherOrBitSet(watchers);
+    }
+
+    @Override
+    public int size() {
+        int size = 0;
+        for(BitHashSet watches : pathWatches.values()) {
+            size += watches.size();
+        }
+        return size;
+    }
+
+    @Override
+    public void shutdown() {
+        if (watcherCleaner != null) {
+            watcherCleaner.shutdown();
+        }
+    }
+
+    private BitHashSet remove(String path) {
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            return pathWatches.remove(path);
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+    }
+
+    boolean isDeadWatcher(Watcher watcher) {
+        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
+    }
+
+    int pathSize() {
+        return pathWatches.size();
+    }
+
+    @Override
+    public WatchesSummary getWatchesSummary() {
+        return new WatchesSummary(
+                watcherBitIdMap.size(), pathSize(), size());
+    }
+
+    @Override
+    public WatchesReport getWatches() {
+        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+        for (Entry<Watcher, Set<String>> e: getWatcher2PathesMap().entrySet()) {
+            Long id = ((ServerCnxn) e.getKey()).getSessionId();
+            Set<String> paths = new HashSet<String>(e.getValue());
+            id2paths.put(id, paths);
+        }
+        return new WatchesReport(id2paths);
+    }
+
+    /**
+     * Iterate through ConcurrentHashMap is 'safe', it will reflect the state
+     * of the map at the time iteration began, may miss update while iterating,
+     * given this is used in the commands to get a general idea of the watches
+     * state, we don't care about missing some update.
+     */
+    @Override
+    public WatchesPathReport getWatchesByPath() {
+        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+        for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+            BitHashSet watchers = e.getValue();
+            synchronized (watchers) {
+                Set<Long> ids = new HashSet<Long>(watchers.size());
+                path2ids.put(e.getKey(), ids);
+                for (Integer wbit : watchers) {
+                    Watcher watcher = watcherBitIdMap.get(wbit);
+                    if (watcher instanceof ServerCnxn) {
+                        ids.add(((ServerCnxn) watcher).getSessionId());
+                    }
+                }
+            }
+        }
+        return new WatchesPathReport(path2ids);
+    }
+
+    /**
+     * May cause OOM if there are lots of watches, might better to forbid
+     * it in this class.
+     */
+    public Map<Watcher, Set<String>> getWatcher2PathesMap() {
+        Map<Watcher, Set<String>> watcher2paths =
+                new HashMap<Watcher, Set<String>>();
+        for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+            String path = e.getKey();
+            BitHashSet watchers = e.getValue();
+            // avoid race condition with add/remove
+            synchronized (watchers) {
+                for (Integer wbit: watchers) {
+                    Watcher w = watcherBitIdMap.get(wbit);
+                    if (w == null) {
+                        continue;
+                    }
+                    if (!watcher2paths.containsKey(w)) {
+                        watcher2paths.put(w, new HashSet<String>());
+                    }
+                    watcher2paths.get(w).add(path);
+                }
+            }
+        }
+        return watcher2paths;
+    }
+
+    @Override
+    public void dumpWatches(PrintWriter pwriter, boolean byPath) {
+        if (byPath) {
+            for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+                pwriter.println(e.getKey());
+                BitHashSet watchers = e.getValue();
+                synchronized (watchers) {
+                    for (Integer wbit : watchers) {
+                        Watcher w = watcherBitIdMap.get(wbit);
+                        if (!(w instanceof ServerCnxn)) {
+                            continue;
+                        }
+                        pwriter.print("\t0x");
+                        pwriter.print(
+                                Long.toHexString(((ServerCnxn)w).getSessionId()));
+                        pwriter.print("\n");
+                    }
+                }
+            }
+        } else {
+            for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) {
+                pwriter.print("0x");
+                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
+                for (String path : e.getValue()) {
+                    pwriter.print("\t");
+                    pwriter.println(path);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(watcherBitIdMap.size())
+            .append(" connections watching ")
+            .append(pathSize()).append(" paths\n");
+        sb.append("Total watches:").append(size());
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java b/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
new file mode 100644
index 0000000..2bfb5aa
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.RateLogger;
+import org.apache.zookeeper.server.WorkerService;
+import org.apache.zookeeper.server.WorkerService.WorkRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread used to lazily clean up the closed watcher, it will trigger the
+ * clean up when the dead watchers get certain number or some number of
+ * seconds has elapsed since last clean up.
+ *
+ * Cost of running it:
+ *
+ * - need to go through all the paths even if the watcher may only
+ *   watching a single path
+ * - block in the path BitHashSet when we try to check the dead watcher
+ *   which won't block other stuff
+ */
+public class WatcherCleaner extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
+    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
+
+    private volatile boolean stopped = false;
+    private final Object cleanEvent = new Object();
+    private final Random r = new Random(System.nanoTime());
+    private final WorkerService cleaners;
+
+    private final Set<Integer> deadWatchers;
+    private final IDeadWatcherListener listener;
+    private final int watcherCleanThreshold;
+    private final int watcherCleanIntervalInSeconds;
+    private final int maxInProcessingDeadWatchers;
+    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
+
+    public WatcherCleaner(IDeadWatcherListener listener) {
+        this(listener,
+            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
+            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
+            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
+            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
+    }
+
+    public WatcherCleaner(IDeadWatcherListener listener,
+            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
+            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
+        this.listener = listener;
+        this.watcherCleanThreshold = watcherCleanThreshold;
+        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
+        int suggestedMaxInProcessingThreshold =
+                watcherCleanThreshold * watcherCleanThreadsNum;
+        if (maxInProcessingDeadWatchers > 0 &&
+                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
+            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
+            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
+                    "than the suggested one, change it to use {}",
+                    maxInProcessingDeadWatchers);
+        }
+        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
+        this.deadWatchers = new HashSet<Integer>();
+        this.cleaners = new WorkerService("DeadWatcherCleanner",
+                watcherCleanThreadsNum, false);
+
+        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
+                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
+                watcherCleanThreshold, watcherCleanIntervalInSeconds,
+                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
+    }
+
+    public void addDeadWatcher(int watcherBit) {
+        // Wait if there are too many watchers waiting to be closed,
+        // this is will slow down the socket packet processing and
+        // the adding watches in the ZK pipeline.
+        while (maxInProcessingDeadWatchers > 0 && !stopped &&
+                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
+            try {
+                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
+                synchronized(totalDeadWatchers) {
+                    totalDeadWatchers.wait(100);
+                }
+            } catch (InterruptedException e) {
+                LOG.info("Got interrupted while waiting for dead watches " +
+                        "queue size");
+            }
+        }
+        synchronized (this) {
+            if (deadWatchers.add(watcherBit)) {
+                totalDeadWatchers.incrementAndGet();
+                if (deadWatchers.size() >= watcherCleanThreshold) {
+                    synchronized (cleanEvent) {
+                        cleanEvent.notifyAll();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            synchronized (cleanEvent) {
+                try {
+                    // add some jitter to avoid cleaning dead watchers at the
+                    // same time in the quorum
+                    if (deadWatchers.size() < watcherCleanThreshold) {
+                        int maxWaitMs = (watcherCleanIntervalInSeconds +
+                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
+                        cleanEvent.wait(maxWaitMs);
+                    }
+                } catch (InterruptedException e) {
+                    LOG.info("Received InterruptedException while " +
+                            "waiting for cleanEvent");
+                    break;
+                }
+            }
+
+            if (deadWatchers.isEmpty()) {
+                continue;
+            }
+
+            synchronized (this) {
+                // Clean the dead watchers need to go through all the current 
+                // watches, which is pretty heavy and may take a second if 
+                // there are millions of watches, that's why we're doing lazily 
+                // batch clean up in a separate thread with a snapshot of the 
+                // current dead watchers.
+                final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers);
+                deadWatchers.clear();
+                int total = snapshot.size();
+                LOG.info("Processing {} dead watchers", total);
+                cleaners.schedule(new WorkRequest() {
+                    @Override
+                    public void doWork() throws Exception {
+                        long startTime = Time.currentElapsedTime();
+                        listener.processDeadWatchers(snapshot);
+                        long latency = Time.currentElapsedTime() - startTime;
+                        LOG.info("Takes {} to process {} watches", latency, total);
+                        totalDeadWatchers.addAndGet(-total);
+                        synchronized(totalDeadWatchers) {
+                            totalDeadWatchers.notifyAll();
+                        }
+                    }
+                });
+            }
+        }
+        LOG.info("WatcherCleaner thread exited");
+    }
+
+    public void shutdown() {
+        stopped = true;
+        deadWatchers.clear();
+        cleaners.stop();
+    }
+
+}