You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zookeeper.apache.org by lvfangmin <gi...@git.apache.org> on 2018/08/06 23:51:30 UTC

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

GitHub user lvfangmin opened a pull request:

    https://github.com/apache/zookeeper/pull/590

    [ZOOKEEPER-1177] Add the memory optimized watch manager for concentrate watches scenario

    The current HashSet based WatcherManager will consume more than 40GB memory when 
    creating 300M watches.
    
    This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result.
    
    Changed made compared to WatchManager:
    * Only keep path to watches map
    * Use BitSet to save the memory used to store watches
    * Use ConcurrentHashMap and ReadWriteLock instead of synchronized to reduce lock retention
    * Lazily clean up the closed watchers

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lvfangmin/zookeeper ZOOKEEPER-1177

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/zookeeper/pull/590.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #590
    
----
commit d4f996fdd760417c90ffb28fd63cc37dc87416c1
Author: Fangmin Lyu <al...@...>
Date:   2018-08-06T21:43:22Z

    add the memory optimized watch manager for concentrate watches

----


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659165
  
    --- Diff: src/java/main/org/apache/zookeeper/server/DumbWatcher.java ---
    @@ -0,0 +1,101 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.nio.ByteBuffer;
    +import java.security.cert.Certificate;
    +
    +import org.apache.jute.Record;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.proto.ReplyHeader;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.ServerStats;
    --- End diff --
    
    unused imports.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @lvfangmin Thanks. Sorry for the delay. I'd like to check one more thing before accepting. Bear with me please.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660630
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    +
    +    /**
    +     * Change to SparseBitSet if we we want to optimize more, the number of
    +     * elements on a single server is usually limited, so BitSet should be
    +     * fine.
    +     */
    +    private final BitSet elementBits = new BitSet();
    +    private final Set<Integer> cache = new HashSet<Integer>();
    +
    +    private final int cacheSize;
    +
    +    // To record how many elements in this set.
    +    private int elementCount = 0;
    +
    +    public BitHashSet() {
    +        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
    +    }
    +
    +    public BitHashSet(int cacheSize) {
    +        this.cacheSize = cacheSize;
    +    }
    +
    +    public synchronized boolean add(Integer elementBit) {
    +        if (elementBit == null || elementBits.get(elementBit)) {
    +            return false;
    +        }
    +        if (cache.size() < cacheSize) {
    +            cache.add(elementBit);
    +        }
    +        elementBits.set(elementBit);
    +        elementCount++;
    +        return true;
    +    }
    +
    +    /**
    +     * Remove the watches, and return the number of watches being removed.
    +     */
    +    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
    +        cache.removeAll(bitSet);
    +        elementBits.andNot(bits);
    +        int elementCountBefore = elementCount;
    +        elementCount = elementBits.cardinality();
    +        return elementCountBefore - elementCount;
    +    }
    +
    +    public synchronized boolean remove(Integer elementBit) {
    +        if (elementBit == null || !elementBits.get(elementBit)) {
    +            return false;
    +        }
    +
    +        cache.remove(elementBit);
    +        elementBits.clear(elementBit);
    +        elementCount--;
    +        return true;
    +    }
    +
    +    public synchronized boolean contains(Integer elementBit) {
    +        if (elementBit == null) {
    +            return false;
    +        }
    +        return elementBits.get(elementBit);
    +    }
    +
    +    public synchronized int size() {
    +        return elementCount;
    +    }
    +
    +    /**
    +     * This function is not thread-safe, need to synchronized when
    +     * iterate through this set.
    +     */
    +    @Override
    +    public Iterator<Integer> iterator() {
    --- End diff --
    
    curious - what's this `iterator` is used for? I did not spot any usage of it by just going over the diff..


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    I cannot commit, because we still have -1 from @hanm . Waiting for him to approve.
    Does it make #612 redundant?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660065
  
    --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---
    @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
                 </listitem>
               </varlistentry>
     
    +
    +          <varlistentry>
    +            <term>watchManaggerName</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watchManaggerName</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
    +                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
    +                config is used define which watch manager to be used. Currently, we only support WatchManager and
    +                WatchManagerOptimized.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreadsNum</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
    +                many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
    +                default value is 2, which is good enough even for heavy and continuous session closing/receating cases.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreshold</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
    +                heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide
    +                the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up
    +                speed issue.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanIntervalInSeconds</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
    +                heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold,
    +                this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger
    +                than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting
    +                is 10 minutes, which usually don't need to be changed.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>maxInProcessingDeadWatchers</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used
    +                to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will
    +                slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing
    +                watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like
    +                watcherCleanThreshold * 1000.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>bitHashCacheSize</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the
    +                settin used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we
    +                need to to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to
    +                keep the size small to make sure it doesn't cost too much in memory, there is a tradeoff between memory
    --- End diff --
    
    space between `trade` and `off`.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214915065
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    +
    +        // Avoid race condition between dead watcher cleaner in
    --- End diff --
    
    I'm not sure if this comment is right. I can't see `WatcherCleaner` anywhere synchronize on the path's BitHashSet. Please correct me if I'm wrong.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214915822
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    +
    +        // Avoid race condition between dead watcher cleaner in
    --- End diff --
    
    Sorry, I got it now.
    BitHashSet's remove method is synchronized, so the cleaner implicitly gets a lock on that.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659934
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    --- End diff --
    
    nit `trade off`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214917025
  
    --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java ---
    @@ -0,0 +1,121 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.util;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +public class BitMap<T> {
    --- End diff --
    
    I think a short javadoc similar to BitHashSet's would useful here.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219661330
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    +            try {
    +                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
    +                synchronized(totalDeadWatchers) {
    +                    totalDeadWatchers.wait(100);
    +                }
    +            } catch (InterruptedException e) {
    +                LOG.info("Got interrupted while waiting for dead watches " +
    +                        "queue size");
    +            }
    +        }
    +        synchronized (this) {
    +            if (deadWatchers.add(watcherBit)) {
    +                totalDeadWatchers.incrementAndGet();
    +                if (deadWatchers.size() >= watcherCleanThreshold) {
    +                    synchronized (cleanEvent) {
    +                        cleanEvent.notifyAll();
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (!stopped) {
    +            synchronized (cleanEvent) {
    +                try {
    +                    if (deadWatchers.size() < watcherCleanThreshold) {
    +                        int maxWaitMs = (watcherCleanIntervalInSeconds +
    +                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
    +                        cleanEvent.wait(maxWaitMs);
    +                    }
    +                } catch (InterruptedException e) {
    +                    LOG.info("Received InterruptedException while " +
    +                            "waiting for cleanEvent");
    +                    break;
    +                }
    +            }
    +
    +            if (deadWatchers.isEmpty()) {
    +                continue;
    +            }
    +
    +            synchronized (this) {
    +                // Snapshot of the current dead watchers
    +                final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers);
    +                deadWatchers.clear();
    --- End diff --
    
    Is there a particular reason to copy the `deadWatchers`, clear it immediately and then do the work, instead of just operate on `deadWatchers` directly and only clear it after the work is done? I assume the motivation was to free `deadWatchers` earlier so we can pipeline the work: adding more dead watchers while the previous pipeline of cleaning was in progress, but it looks like the new dead watchers will block on `totalDeadWatchers`, which will only be reset after previous dead watchers were cleaned up.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214965941
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    I need the exclusive lock with addWatch, otherwise addWatch may still add a dead watch which won't be cleaned up in the WatchCleaner when it started to clean up. 


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Added JMH micro benchmark for the watch manager:
    
    * It shows **big win** for the watch heavy cases, with the current implementation, it uses more than 50MB memory to store 1M watches, with WatchManagerOptimized it only uses around 0.2MB. 
    * It also makes add and trigger watches more efficient, since WatchManagerOptimized doesn't maintain the reverse map. 
    * In sparse watches use case, the WatchManagerOptimized is expected to use a bit more memory because it needs extra effort to maintain those bit set. In the test it shows around 10% more memory usage.
    
    Here are more result about the throughput/latency related with WatchManager: 
    
    ```
    Benchmark                               (pathCount)    (watchManagerClass)  (watcherCount)  Mode  Cnt   Score    Error  Units
    WatchBench.testAddConcentrateWatch            10000           WatchManager             N/A  avgt    9   5.382 ±  0.968  ms/op
    WatchBench.testAddConcentrateWatch            10000  WatchManagerOptimized             N/A  avgt    9   0.696 ±  0.133  ms/op
    WatchBench.testAddSparseWatch                 10000           WatchManager           10000  avgt    9   4.889 ±  1.585  ms/op
    WatchBench.testAddSparseWatch                 10000  WatchManagerOptimized           10000  avgt    9   4.794 ±  1.068  ms/op
    WatchBench.testTriggerConcentrateWatch            1           WatchManager               1  avgt    9  ≈ 10⁻⁴           ms/op
    WatchBench.testTriggerConcentrateWatch            1           WatchManager            1000  avgt    9   0.037 ±  0.002  ms/op
    WatchBench.testTriggerConcentrateWatch            1  WatchManagerOptimized               1  avgt    9  ≈ 10⁻⁴           ms/op
    WatchBench.testTriggerConcentrateWatch            1  WatchManagerOptimized            1000  avgt    9   0.025 ±  0.001  ms/op
    WatchBench.testTriggerConcentrateWatch         1000           WatchManager               1  avgt    9   0.048 ±  0.003  ms/op
    WatchBench.testTriggerConcentrateWatch         1000           WatchManager            1000  avgt    9  71.838 ±  4.043  ms/op
    WatchBench.testTriggerConcentrateWatch         1000  WatchManagerOptimized               1  avgt    9   0.079 ±  0.002  ms/op
    WatchBench.testTriggerConcentrateWatch         1000  WatchManagerOptimized            1000  avgt    9  26.135 ±  0.223  ms/op
    WatchBench.testTriggerSparseWatch             10000           WatchManager           10000  avgt    9   1.207 ±  0.035  ms/op
    WatchBench.testTriggerSparseWatch             10000  WatchManagerOptimized           10000  avgt    9   1.321 ±  0.019  ms/op
    ```
    
    You can try the following command to run the micro benchmark:
    ```
    $ ant clean package
    $ ant clean package -buildfile zookeeper-contrib/zookeeper-contrib-fatjar/build.xml
    $ java -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar jmh
    ```
    
    @maoling @anmolnar hope this gives you a more vivid comparison between the old and new watch manager implementation.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219934138
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    --- End diff --
    
    nit - `suppress`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660592
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    +
    +    /**
    +     * Change to SparseBitSet if we we want to optimize more, the number of
    +     * elements on a single server is usually limited, so BitSet should be
    +     * fine.
    +     */
    +    private final BitSet elementBits = new BitSet();
    +    private final Set<Integer> cache = new HashSet<Integer>();
    +
    +    private final int cacheSize;
    +
    +    // To record how many elements in this set.
    +    private int elementCount = 0;
    +
    +    public BitHashSet() {
    +        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
    +    }
    +
    +    public BitHashSet(int cacheSize) {
    +        this.cacheSize = cacheSize;
    +    }
    +
    +    public synchronized boolean add(Integer elementBit) {
    +        if (elementBit == null || elementBits.get(elementBit)) {
    +            return false;
    +        }
    +        if (cache.size() < cacheSize) {
    +            cache.add(elementBit);
    +        }
    +        elementBits.set(elementBit);
    +        elementCount++;
    +        return true;
    +    }
    +
    +    /**
    +     * Remove the watches, and return the number of watches being removed.
    +     */
    +    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
    +        cache.removeAll(bitSet);
    +        elementBits.andNot(bits);
    +        int elementCountBefore = elementCount;
    +        elementCount = elementBits.cardinality();
    +        return elementCountBefore - elementCount;
    +    }
    +
    +    public synchronized boolean remove(Integer elementBit) {
    +        if (elementBit == null || !elementBits.get(elementBit)) {
    +            return false;
    +        }
    +
    +        cache.remove(elementBit);
    +        elementBits.clear(elementBit);
    +        elementCount--;
    +        return true;
    +    }
    +
    +    public synchronized boolean contains(Integer elementBit) {
    +        if (elementBit == null) {
    +            return false;
    +        }
    +        return elementBits.get(elementBit);
    --- End diff --
    
    should we look up `cache` first here? If not, what's the purpose of adding `cache`?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660120
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    --- End diff --
    
    nit `need to to` 


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219641593
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.server.ServerCnxn;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    --- End diff --
    
    Remove all imports here except these three since rest of those were not used (my guess is this file was copied pasted?)
    `
    import java.io.PrintWriter;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    `


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214866254
  
    --- Diff: build.xml ---
    @@ -119,6 +119,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
         <property name="test.java.classes" value="${test.java.build.dir}/classes"/>
         <property name="test.src.dir" value="${src.dir}/java/test"/>
         <property name="systest.src.dir" value="${src.dir}/java/systest"/>
    +    <property name="bench.src.dir" value="${src.dir}/java/bench"/>
    --- End diff --
    
    I think this new dir should be added to classpath of `eclipse` task too.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    systest will go to src/test/java , from my side, you can put the bench in org.apache.zookeeper.test.system . Thinking about it, that's a pretty good place.
    
    No need to create main directory or anything, I will move all the files anyway. Just move the files amongst the others in systest. (Hopefully no package level dependency, I didn't check)
    
    But there's a chance you will have to rebase if this this PR cannot be merged before the movement of all the remaining files as the directory refactor's last step. Sorry about that in advance... I'm not going to be the most popular with that PR :(


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659918
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    --- End diff --
    
    is there a reason that this `BitHashSet` is part of `server.watch` package rather than part of `server.util` package, where a similar helper class `BitMap` sits?


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by maoling <gi...@git.apache.org>.
Github user maoling commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @lvfangmin  interesting.where can we find a benchmark which shows how this pr improves the memory?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660689
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/DeadWatcherListener.java ---
    @@ -0,0 +1,34 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +
    +/**
    + * Interface used to process the dead watchers related to closed cnxns.
    + */
    +public interface DeadWatcherListener {
    --- End diff --
    
    would be good to rename this to `IDeadWatchListner`, which makes it obvious this is an interface. We already do this for `IWatchManager`.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214922678
  
    --- Diff: src/java/test/org/apache/zookeeper/server/DumbWatcher.java ---
    @@ -0,0 +1,96 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.nio.ByteBuffer;
    +import java.security.cert.Certificate;
    +
    +import org.apache.jute.Record;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.proto.ReplyHeader;
    +
    +public class DumbWatcher extends ServerCnxn {
    --- End diff --
    
    Please consider using mockito.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214901441
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    Do you need to acquire write lock here?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660054
  
    --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---
    @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
                 </listitem>
               </varlistentry>
     
    +
    +          <varlistentry>
    +            <term>watchManaggerName</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watchManaggerName</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
    +                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
    +                config is used define which watch manager to be used. Currently, we only support WatchManager and
    +                WatchManagerOptimized.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreadsNum</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
    +                many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
    +                default value is 2, which is good enough even for heavy and continuous session closing/receating cases.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreshold</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
    +                heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide
    +                the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up
    +                speed issue.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanIntervalInSeconds</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
    +                heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold,
    +                this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger
    +                than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting
    +                is 10 minutes, which usually don't need to be changed.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>maxInProcessingDeadWatchers</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used
    +                to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will
    +                slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing
    +                watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like
    +                watcherCleanThreshold * 1000.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>bitHashCacheSize</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the
    +                settin used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we
    --- End diff --
    
    spell check: `setting used to`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220050127
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    --- End diff --
    
    Yes, it's used to improve the read throughput, creating new watcher bit and adding it to the BitHashSet has it's own lock to minimize the lock scope.  I'll add some comments here.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r215180602
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java ---
    @@ -46,15 +48,26 @@
         private final Map<Watcher, Set<String>> watch2Paths =
             new HashMap<Watcher, Set<String>>();
     
    -    synchronized int size(){
    +    @Override
    +    public synchronized int size(){
             int result = 0;
             for(Set<Watcher> watches : watchTable.values()) {
                 result += watches.size();
             }
             return result;
         }
     
    -    synchronized void addWatch(String path, Watcher watcher) {
    +    boolean isDeadWatcher(Watcher watcher) {
    --- End diff --
    
    Taking that into account and the jira fix version, this patch will definitely go into 3.5 as well.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    got a ["green" build](https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2285/testReport/) (minors the known failed test `testReconfigRemoveClientFromStatic` discussed at [ZOOKEEPER-2847](https://issues.apache.org/jira/browse/ZOOKEEPER-2847)). 



---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r217112114
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    In the caller of removeWatcher, which is in NIOServerCnxn.close and NettyServerCnxn.close.
    
    When the cnxn is closed, it will set stale before call removeCnxn on zkServer, which calls this function sequentially, if we grabbed this lock, it means the cnxn has been marked as stale.
    
    We can explicitly setStale here as well, but I don't think that's necessary.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219912709
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    --- End diff --
    
    nit: use case


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219688243
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    --- End diff --
    
    Previously, it was using inheritance instead of composition with HashSet, at that time we added this serialVersionUID, didn't remove this after changing to composition, will remove it.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Added more comments about the locking where @anmolnar and @hanm asked during review, it will make the future reference easier as well. Also corrected the typo and unused import.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    thanks @lvfangmin for detailed reply.
    just had another review pass over some files i forgot to look last time. overall looks good. will sign this off once all review comments are addressed. thanks


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219642002
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.server.ServerCnxn;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public interface IWatchManager {
    +
    +    /**
    +     * Add watch to specific path.
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher added is not already present
    +     */
    +    public boolean addWatch(String path, Watcher watcher);
    +
    +    /**
    +     * Checks the specified watcher exists for the given path
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher exists, false otherwise
    +     */
    +    public boolean containsWatcher(String path, Watcher watcher);
    +
    +    /**
    +     * Removes the specified watcher for the given path
    --- End diff --
    
    nit: missing full stop at end of sentence.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219688164
  
    --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java ---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.util;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +/**
    + * This is a helper class to maintain the bit to specific value and the
    + * reversed value to bit mapping.
    + */
    +public class BitMap<T> {
    +
    +    private final Map<T, Integer> value2Bit = new HashMap<T, Integer>();
    +    private final Map<Integer, T> bit2Value = new HashMap<Integer, T>();
    +
    +    private final BitSet freedBitSet = new BitSet();
    +    private Integer nextBit = Integer.valueOf(0);
    +
    +    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    +
    +    public Integer add(T value) {
    +        Integer bit = getBit(value);
    +        if (bit != null) {
    +            return bit;
    +        }
    --- End diff --
    
    This BitMap is used by WatchManagerOptimized.watcherBitIdMap, which is used to store watcher to bit mapping.
    
    Add might be called a lot if the same client connection is watching on thousands of even millions of nodes, remove only called once when the session is closed, that's why we optimized to check read lock first in add, but use write lock directly in remove.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2278/



---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219687878
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    +
    +    /**
    +     * Change to SparseBitSet if we we want to optimize more, the number of
    +     * elements on a single server is usually limited, so BitSet should be
    +     * fine.
    +     */
    +    private final BitSet elementBits = new BitSet();
    +    private final Set<Integer> cache = new HashSet<Integer>();
    +
    +    private final int cacheSize;
    +
    +    // To record how many elements in this set.
    +    private int elementCount = 0;
    +
    +    public BitHashSet() {
    +        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
    +    }
    +
    +    public BitHashSet(int cacheSize) {
    +        this.cacheSize = cacheSize;
    +    }
    +
    +    public synchronized boolean add(Integer elementBit) {
    +        if (elementBit == null || elementBits.get(elementBit)) {
    +            return false;
    +        }
    +        if (cache.size() < cacheSize) {
    +            cache.add(elementBit);
    +        }
    +        elementBits.set(elementBit);
    +        elementCount++;
    +        return true;
    +    }
    +
    +    /**
    +     * Remove the watches, and return the number of watches being removed.
    +     */
    +    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
    +        cache.removeAll(bitSet);
    +        elementBits.andNot(bits);
    +        int elementCountBefore = elementCount;
    +        elementCount = elementBits.cardinality();
    +        return elementCountBefore - elementCount;
    +    }
    +
    +    public synchronized boolean remove(Integer elementBit) {
    +        if (elementBit == null || !elementBits.get(elementBit)) {
    +            return false;
    +        }
    +
    +        cache.remove(elementBit);
    +        elementBits.clear(elementBit);
    +        elementCount--;
    +        return true;
    +    }
    +
    +    public synchronized boolean contains(Integer elementBit) {
    +        if (elementBit == null) {
    +            return false;
    +        }
    +        return elementBits.get(elementBit);
    --- End diff --
    
    BitSet.get is O(1), check cache doesn't may actually more expensive. 
    
    HashSet is used to optimize the iterating, for example, if there is a single element in this BitHashSet, but the bit is very large, without HashSet we need to go through all the words before return that element, which is not efficient.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214890207
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java ---
    @@ -46,15 +48,26 @@
         private final Map<Watcher, Set<String>> watch2Paths =
             new HashMap<Watcher, Set<String>>();
     
    -    synchronized int size(){
    +    @Override
    +    public synchronized int size(){
             int result = 0;
             for(Set<Watcher> watches : watchTable.values()) {
                 result += watches.size();
             }
             return result;
         }
     
    -    synchronized void addWatch(String path, Watcher watcher) {
    +    boolean isDeadWatcher(Watcher watcher) {
    --- End diff --
    
    Looks like this patch is not just an improvement, but it also fixes the edge case of adding dead watchers.
    Previously stale client connections haven't been checked while registering watchers.
    Is that correct?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660094
  
    --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---
    @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
                 </listitem>
               </varlistentry>
     
    +
    +          <varlistentry>
    +            <term>watchManaggerName</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watchManaggerName</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
    +                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
    +                config is used define which watch manager to be used. Currently, we only support WatchManager and
    +                WatchManagerOptimized.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreadsNum</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
    +                many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
    +                default value is 2, which is good enough even for heavy and continuous session closing/receating cases.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreshold</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
    +                heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide
    +                the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up
    +                speed issue.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanIntervalInSeconds</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
    +                heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold,
    +                this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger
    +                than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting
    +                is 10 minutes, which usually don't need to be changed.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>maxInProcessingDeadWatchers</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used
    +                to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will
    +                slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing
    +                watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like
    +                watcherCleanThreshold * 1000.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>bitHashCacheSize</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the
    +                settin used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we
    +                need to to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to
    --- End diff --
    
    `need to` instead of `need to to`


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Rebased to resolve conflict. @anmolnar @hanm @maoling @nkalmar please revisit this PR when you have time.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2223/



---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220055378
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    --- End diff --
    
    I knew it's the metrics. It's fine to leave this variable and we can add metrics in another patch, since this patch is already big enough and almost ready to land.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659808
  
    --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java ---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.util;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +/**
    + * This is a helper class to maintain the bit to specific value and the
    + * reversed value to bit mapping.
    + */
    +public class BitMap<T> {
    +
    +    private final Map<T, Integer> value2Bit = new HashMap<T, Integer>();
    +    private final Map<Integer, T> bit2Value = new HashMap<Integer, T>();
    +
    +    private final BitSet freedBitSet = new BitSet();
    +    private Integer nextBit = Integer.valueOf(0);
    +
    +    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    +
    +    public Integer add(T value) {
    +        Integer bit = getBit(value);
    +        if (bit != null) {
    +            return bit;
    +        }
    --- End diff --
    
    is it usual that we usually call this `add` method with same value over and over? If that's the case, then this optimization is good, but if not, then this adds unnecessary cost of acquiring and releasing the reader lock plus writer lock per `add`, as opposed to just acquiring and releasing writer lock once per `add`. 


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Based on internal benchmark, this may save more than 95% memory usage on concentrate watches scenario. I'm working on adding some micro benchmark.
    
    In the original Jira, @phunt also added some basic test, which could show you some ideas about how much memory it's going to save.
    
    The current HashMap based watch manager uses lots of memory due to the overhead of storing each entry (32 Bytes for each entry), the object reference and the duplicated path strings.
    
    Use BitSet without revert index could reduce those overhead and make it more memory efficient.
    



---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @lvfangmin Just to wrap up the difference between this and original 6-year-old patch on Jira: you've added `deadWatchers` collection and lazy `WatcherCleaner` to avoid the performance penalty on `removeWatches()`.
    
    Is that correct? 


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214901032
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    Looks like it's consistent with `BitMap` implementation, so it's probably intentional that you don't do the same DCL optimisation as in the `add` method.
    
    Why don't you check whether the Watcher is on the path before you acquire the write lock?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219661253
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    +            try {
    +                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
    +                synchronized(totalDeadWatchers) {
    +                    totalDeadWatchers.wait(100);
    +                }
    +            } catch (InterruptedException e) {
    +                LOG.info("Got interrupted while waiting for dead watches " +
    +                        "queue size");
    +            }
    +        }
    +        synchronized (this) {
    +            if (deadWatchers.add(watcherBit)) {
    +                totalDeadWatchers.incrementAndGet();
    +                if (deadWatchers.size() >= watcherCleanThreshold) {
    +                    synchronized (cleanEvent) {
    +                        cleanEvent.notifyAll();
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (!stopped) {
    +            synchronized (cleanEvent) {
    +                try {
    +                    if (deadWatchers.size() < watcherCleanThreshold) {
    +                        int maxWaitMs = (watcherCleanIntervalInSeconds +
    +                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
    --- End diff --
    
    is there a particular reason of choosing this versus, say exponential backoff? 


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Resolve conflict with latest code on master.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @hanm the code complained by Findbug are all correct and expected:
    
    1. System.exit if create watch manager failed in DataTree
    It's using the factory to create the watch manager based on the class name, in case we cannot initialize the class due to invalid classname, we have to exit.
    
    2. BitHashSet.elementCount not synchronized in iterator()
    It cannot guarantee thread safe during iteration inside this function, so it's marked as non thread safe in the comment, and the caller needs to and is synchronizing it during iterating.
    
    3. Synchronize on AtomicInteger
    In the code, we synchronize on it to do wait/notify, but not relying on the synchronization to control the  AtomicInteger value update, so it's used correctly.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r215183702
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    In which case you need to move `addDeadWatcher()` call inside the critical block.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219688028
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    +            try {
    +                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
    +                synchronized(totalDeadWatchers) {
    +                    totalDeadWatchers.wait(100);
    +                }
    +            } catch (InterruptedException e) {
    +                LOG.info("Got interrupted while waiting for dead watches " +
    +                        "queue size");
    +            }
    +        }
    +        synchronized (this) {
    +            if (deadWatchers.add(watcherBit)) {
    +                totalDeadWatchers.incrementAndGet();
    +                if (deadWatchers.size() >= watcherCleanThreshold) {
    +                    synchronized (cleanEvent) {
    +                        cleanEvent.notifyAll();
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (!stopped) {
    +            synchronized (cleanEvent) {
    +                try {
    +                    if (deadWatchers.size() < watcherCleanThreshold) {
    +                        int maxWaitMs = (watcherCleanIntervalInSeconds +
    +                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
    --- End diff --
    
    Clean up dead watches on large watches ensemble is a heavy work, which might affect the performance, so add jitter to make sure we don't do the lazily clean up at the same time on all the servers in the ensemble.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659161
  
    --- Diff: src/java/main/org/apache/zookeeper/server/DumbWatcher.java ---
    @@ -0,0 +1,101 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.nio.ByteBuffer;
    +import java.security.cert.Certificate;
    +
    +import org.apache.jute.Record;
    +import org.apache.zookeeper.Watcher;
    --- End diff --
    
    unused import.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219911008
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    --- End diff --
    
    I think this should be `maxInProcessingDeadWatchers != -1 && totalDeadWatchers.get() >= maxInProcessingDeadWatchers`. Otherwise we'll always wait on `totalDeadWatchers` if user use default configuration value of `maxInProcessingDeadWatchers`.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219959421
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    --- End diff --
    
    Would be good to add a comment here regarding why no synchronization is required here.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220050492
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    --- End diff --
    
    That's correct, reading requests are processed concurrently in CommitProcessor worker service, so it's possible multiple thread might add to pathWatches while we're holding read lock, that's why we need this check here. 


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219896267
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    +
    +    /**
    +     * Change to SparseBitSet if we we want to optimize more, the number of
    +     * elements on a single server is usually limited, so BitSet should be
    +     * fine.
    +     */
    +    private final BitSet elementBits = new BitSet();
    +    private final Set<Integer> cache = new HashSet<Integer>();
    +
    +    private final int cacheSize;
    +
    +    // To record how many elements in this set.
    +    private int elementCount = 0;
    +
    +    public BitHashSet() {
    +        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
    +    }
    +
    +    public BitHashSet(int cacheSize) {
    +        this.cacheSize = cacheSize;
    +    }
    +
    +    public synchronized boolean add(Integer elementBit) {
    +        if (elementBit == null || elementBits.get(elementBit)) {
    +            return false;
    +        }
    +        if (cache.size() < cacheSize) {
    +            cache.add(elementBit);
    +        }
    +        elementBits.set(elementBit);
    +        elementCount++;
    +        return true;
    +    }
    +
    +    /**
    +     * Remove the watches, and return the number of watches being removed.
    +     */
    +    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
    +        cache.removeAll(bitSet);
    +        elementBits.andNot(bits);
    +        int elementCountBefore = elementCount;
    +        elementCount = elementBits.cardinality();
    +        return elementCountBefore - elementCount;
    +    }
    +
    +    public synchronized boolean remove(Integer elementBit) {
    +        if (elementBit == null || !elementBits.get(elementBit)) {
    +            return false;
    +        }
    +
    +        cache.remove(elementBit);
    +        elementBits.clear(elementBit);
    +        elementCount--;
    +        return true;
    +    }
    +
    +    public synchronized boolean contains(Integer elementBit) {
    +        if (elementBit == null) {
    +            return false;
    +        }
    +        return elementBits.get(elementBit);
    --- End diff --
    
    would be good to add a comment at the declaration of `cache` variable, stating that its purpose is to optimize iteration.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219640916
  
    --- Diff: src/java/main/org/apache/zookeeper/server/DataTree.java ---
    @@ -253,6 +259,14 @@ public DataTree() {
             addConfigNode();
     
             nodeDataSize.set(approximateDataSize());
    +        try {
    +            dataWatches = WatchManagerFactory.createWatchManager();
    +            childWatches = WatchManagerFactory.createWatchManager();
    +        } catch (Exception e) {
    +            LOG.error("Unexpected exception when creating WatchManager, " +
    +                    "exiting abnormally", e);
    --- End diff --
    
    nit: use parameterized logging here.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @lvfangmin Docs for the new caching parameter `zookeeper.bitHashCacheSize`? 


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219958886
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    --- End diff --
    
    Is this check necessary, because we are using a read lock here so it's possible for another thread to modify the `pathWatches` while we are here?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r216910219
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    Missed this comment last time, what we need here is that, as long as we called addDeadWatcher, there will be no watches added related with this dead watcher. Code executed after line 136 means the watcher will be marked as stale, after we release this lock, any on flying addWatcher for this dead watcher will be rejected, so it guarantees when we call addDeadWatcher there will be no race condition between removing and adding watch.
    
    And I need to move addDeadWatcher out of the locking block, since the WatchCleaner might block on it to avoid OOM issue if the cleaner cannot catch up of cleaning the dead watchers.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    There is a PR in the Jira few years ago, which uses BitMap as well, but it sacrificed the performance on triggering watches, this patch improved that and uses lazy clean up for cleaning those watchers who has closed the session.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219687885
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    +
    +    /**
    +     * Change to SparseBitSet if we we want to optimize more, the number of
    +     * elements on a single server is usually limited, so BitSet should be
    +     * fine.
    +     */
    +    private final BitSet elementBits = new BitSet();
    +    private final Set<Integer> cache = new HashSet<Integer>();
    +
    +    private final int cacheSize;
    +
    +    // To record how many elements in this set.
    +    private int elementCount = 0;
    +
    +    public BitHashSet() {
    +        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
    +    }
    +
    +    public BitHashSet(int cacheSize) {
    +        this.cacheSize = cacheSize;
    +    }
    +
    +    public synchronized boolean add(Integer elementBit) {
    +        if (elementBit == null || elementBits.get(elementBit)) {
    +            return false;
    +        }
    +        if (cache.size() < cacheSize) {
    +            cache.add(elementBit);
    +        }
    +        elementBits.set(elementBit);
    +        elementCount++;
    +        return true;
    +    }
    +
    +    /**
    +     * Remove the watches, and return the number of watches being removed.
    +     */
    +    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
    +        cache.removeAll(bitSet);
    +        elementBits.andNot(bits);
    +        int elementCountBefore = elementCount;
    +        elementCount = elementBits.cardinality();
    +        return elementCountBefore - elementCount;
    +    }
    +
    +    public synchronized boolean remove(Integer elementBit) {
    +        if (elementBit == null || !elementBits.get(elementBit)) {
    +            return false;
    +        }
    +
    +        cache.remove(elementBit);
    +        elementBits.clear(elementBit);
    +        elementCount--;
    +        return true;
    +    }
    +
    +    public synchronized boolean contains(Integer elementBit) {
    +        if (elementBit == null) {
    +            return false;
    +        }
    +        return elementBits.get(elementBit);
    +    }
    +
    +    public synchronized int size() {
    +        return elementCount;
    +    }
    +
    +    /**
    +     * This function is not thread-safe, need to synchronized when
    +     * iterate through this set.
    +     */
    +    @Override
    +    public Iterator<Integer> iterator() {
    --- End diff --
    
    It's used in the triggerWatcher with for iterator.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659171
  
    --- Diff: src/java/main/org/apache/zookeeper/server/DumbWatcher.java ---
    @@ -0,0 +1,101 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.nio.ByteBuffer;
    +import java.security.cert.Certificate;
    +
    +import org.apache.jute.Record;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.proto.ReplyHeader;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.ServerStats;
    +
    +/**
    + * A empthy watcher implementation used in bench and unit test.
    --- End diff --
    
    spelling: `empty`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219931441
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    --- End diff --
    
    we can remove this - it's assigned later but never used.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214916758
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/DeadWatcherListener.java ---
    @@ -0,0 +1,31 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +
    +public interface DeadWatcherListener {
    --- End diff --
    
    Please add a few words javadoc here.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219661278
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    +            try {
    +                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
    +                synchronized(totalDeadWatchers) {
    +                    totalDeadWatchers.wait(100);
    +                }
    +            } catch (InterruptedException e) {
    +                LOG.info("Got interrupted while waiting for dead watches " +
    +                        "queue size");
    +            }
    +        }
    +        synchronized (this) {
    +            if (deadWatchers.add(watcherBit)) {
    +                totalDeadWatchers.incrementAndGet();
    +                if (deadWatchers.size() >= watcherCleanThreshold) {
    +                    synchronized (cleanEvent) {
    +                        cleanEvent.notifyAll();
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (!stopped) {
    +            synchronized (cleanEvent) {
    +                try {
    +                    if (deadWatchers.size() < watcherCleanThreshold) {
    +                        int maxWaitMs = (watcherCleanIntervalInSeconds +
    +                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
    +                        cleanEvent.wait(maxWaitMs);
    +                    }
    +                } catch (InterruptedException e) {
    +                    LOG.info("Received InterruptedException while " +
    +                            "waiting for cleanEvent");
    +                    break;
    +                }
    +            }
    +
    +            if (deadWatchers.isEmpty()) {
    +                continue;
    +            }
    +
    +            synchronized (this) {
    +                // Snapshot of the current dead watchers
    +                final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers);
    +                deadWatchers.clear();
    +                int total = snapshot.size();
    +                LOG.info("Processing {} dead watchers", total);
    +						    cleaners.schedule(new WorkRequest() {
    --- End diff --
    
    indentation issue on this line...


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r216973199
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    Where do you mark the watcher as stale inside the critical block?
    It only calls a getter on the `BitIdMap`, right?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219912160
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    --- End diff --
    
    couple of unused imports here


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @anmolnar #612 was fired because there is user reporting that issue, and it's better to solve it earlier than waiting this diff being reviewed and merged. I'll do the rebase to get rid of that change in this patch.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219957465
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    --- End diff --
    
    Is the purpose of using a read lock here is to optimize for `addWatch` heavy workloads? Would be good to add a comment here about why choose use a read lock instead of write lock.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214918425
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    +
    +        // Avoid race condition between dead watcher cleaner in
    +        // WatcherCleaner and iterating here
    +        synchronized (watchers) {
    +            for (Integer wBit : watchers) {
    +                if (supress != null && supress.contains(wBit)) {
    +                    continue;
    +                }
    +
    +                Watcher w = watcherBitIdMap.get(wBit);
    +
    +                // skip dead watcher
    +                if (w == null || isDeadWatcher(w)) {
    +                    continue;
    +                }
    +
    +                w.process(e);
    --- End diff --
    
    Not the scope of this PR, but I've found it surprising that `process()` is not guarded by try-catch. A single watcher could prevent others from triggering by throwing an exception?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219660892
  
    --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---
    @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
                 </listitem>
               </varlistentry>
     
    +
    +          <varlistentry>
    +            <term>watchManaggerName</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watchManaggerName</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
    +                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
    +                config is used define which watch manager to be used. Currently, we only support WatchManager and
    +                WatchManagerOptimized.</para>
    +            </listitem>
    +          </varlistentry>
    +
    +          <varlistentry>
    +            <term>watcherCleanThreadsNum</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
    +                manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
    +                many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
    +                default value is 2, which is good enough even for heavy and continuous session closing/receating cases.</para>
    --- End diff --
    
    `closing/recreating`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220055291
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    --- End diff --
    
    oops, did not see `maxInProcessingDeadWatchers > 0`. we are good here.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214966630
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    --- End diff --
    
    All the watchers being processed here are guaranteed to be dead, addWatch will return before it's changing the pathWatches and watcherBitIdMap, that's why I don't need the lock here.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @anmolnar Thanks for reviewing, take your time. 
    
    Here are the main differences between this version and the old one on the Jira:
    
    1. use path to watchers map instead of watcher to paths to improve the performance of trigger watches, we need the lazily dead watcher clean up because of this (main change)
    2. better and cleaner implementation, for example, added WatchManagerFactory to easily switch between different watch manager implementation
    3. some perf improvement by using HashSet and BitSet to find a balance between memory usage and time complexity
    4. fix the watcher leaking issue due to adding dead watcher (we can separate this out if we want to)
    5. fix the NettyServerCnxn doesn't de-register itself from watcher manager when the cnxn closed (this is actually fixed recently in #612, so it's not necessary to do it here now)
    6. added jmh micro benchmark
    



---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219688196
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    --- End diff --
    
    At the beginning when we added this class, it was bound with Watcher, but not anymore after refactoring, we can move this to server.util, I'll do that.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219643998
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java ---
    @@ -83,18 +97,21 @@ synchronized void removeWatcher(Watcher watcher) {
                 Set<Watcher> list = watchTable.get(p);
                 if (list != null) {
                     list.remove(watcher);
    -                if (list.size() == 0) {
    +                if (list.isEmpty()) {
                         watchTable.remove(p);
                     }
                 }
             }
         }
     
    -    Set<Watcher> triggerWatch(String path, EventType type) {
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
             return triggerWatch(path, type, null);
         }
     
    -    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    --- End diff --
    
    `suppress ` instead of `supress`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659886
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    --- End diff --
    
    I don't think this is used anywhere in this file. 


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219642525
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.server.ServerCnxn;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public interface IWatchManager {
    +
    +    /**
    +     * Add watch to specific path.
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher added is not already present
    +     */
    +    public boolean addWatch(String path, Watcher watcher);
    +
    +    /**
    +     * Checks the specified watcher exists for the given path
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher exists, false otherwise
    +     */
    +    public boolean containsWatcher(String path, Watcher watcher);
    +
    +    /**
    +     * Removes the specified watcher for the given path
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher successfully removed, false otherwise
    +     */
    +    public boolean removeWatcher(String path, Watcher watcher);
    +
    +    /**
    +     * The entry to remove the watcher when the cnxn is closed.
    +     *
    +     * @param watcher watcher object reference
    +     */
    +    public void removeWatcher(Watcher watcher);
    +
    +    /**
    +     * Distribute the watch event for the given path.
    +     *
    +     * @param path znode path
    +     * @param type the watch event type
    +     *
    +     * @return the watchers have been notified
    +     */
    +    public WatcherOrBitSet triggerWatch(String path, EventType type);
    +
    +    /**
    +     * Distribute the watch event for the given path, but ignore those
    +     * supressed ones.
    --- End diff --
    
    spell check: `suppressed` instead of`supressed`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214987142
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    --- End diff --
    
    Actually, this is based on the scenario optimization, like cache hit, when user send removeWatcher request, it's more likely the watcher is actually exist and haven't fired yet, so instead of multiple read lock and API call to check exist then switch to write lock, it's actually cheaper to only call write lock once.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214961550
  
    --- Diff: build.xml ---
    @@ -119,6 +119,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
         <property name="test.java.classes" value="${test.java.build.dir}/classes"/>
         <property name="test.src.dir" value="${src.dir}/java/test"/>
         <property name="systest.src.dir" value="${src.dir}/java/systest"/>
    +    <property name="bench.src.dir" value="${src.dir}/java/bench"/>
    --- End diff --
    
    Will do.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219688507
  
    --- Diff: src/test/java/bench/org/apache/zookeeper/BenchMain.java ---
    @@ -0,0 +1,12 @@
    +package org.apache.zookeeper;
    --- End diff --
    
    Will add it.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214968090
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    +
    +        // Avoid race condition between dead watcher cleaner in
    +        // WatcherCleaner and iterating here
    +        synchronized (watchers) {
    +            for (Integer wBit : watchers) {
    +                if (supress != null && supress.contains(wBit)) {
    +                    continue;
    +                }
    +
    +                Watcher w = watcherBitIdMap.get(wBit);
    +
    +                // skip dead watcher
    +                if (w == null || isDeadWatcher(w)) {
    +                    continue;
    +                }
    +
    +                w.process(e);
    --- End diff --
    
    NIOServerCnxn.process will catch all Exceptions, but Netty implementation only catches IOException, I agree it's safer to catch exception.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219642730
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.server.ServerCnxn;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public interface IWatchManager {
    +
    +    /**
    +     * Add watch to specific path.
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher added is not already present
    +     */
    +    public boolean addWatch(String path, Watcher watcher);
    +
    +    /**
    +     * Checks the specified watcher exists for the given path
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher exists, false otherwise
    +     */
    +    public boolean containsWatcher(String path, Watcher watcher);
    +
    +    /**
    +     * Removes the specified watcher for the given path
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher successfully removed, false otherwise
    +     */
    +    public boolean removeWatcher(String path, Watcher watcher);
    +
    +    /**
    +     * The entry to remove the watcher when the cnxn is closed.
    +     *
    +     * @param watcher watcher object reference
    +     */
    +    public void removeWatcher(Watcher watcher);
    +
    +    /**
    +     * Distribute the watch event for the given path.
    +     *
    +     * @param path znode path
    +     * @param type the watch event type
    +     *
    +     * @return the watchers have been notified
    +     */
    +    public WatcherOrBitSet triggerWatch(String path, EventType type);
    +
    +    /**
    +     * Distribute the watch event for the given path, but ignore those
    +     * supressed ones.
    +     *
    +     * @param path znode path
    +     * @param type the watch event type
    +     * @param supress the supressed watcher set
    +     *
    +     * @return the watchers have been notified
    +     */
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress);
    --- End diff --
    
    similar spelling issue for `supress`


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219688430
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    +            try {
    +                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
    +                synchronized(totalDeadWatchers) {
    +                    totalDeadWatchers.wait(100);
    +                }
    +            } catch (InterruptedException e) {
    +                LOG.info("Got interrupted while waiting for dead watches " +
    +                        "queue size");
    +            }
    +        }
    +        synchronized (this) {
    +            if (deadWatchers.add(watcherBit)) {
    +                totalDeadWatchers.incrementAndGet();
    +                if (deadWatchers.size() >= watcherCleanThreshold) {
    +                    synchronized (cleanEvent) {
    +                        cleanEvent.notifyAll();
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (!stopped) {
    +            synchronized (cleanEvent) {
    +                try {
    +                    if (deadWatchers.size() < watcherCleanThreshold) {
    +                        int maxWaitMs = (watcherCleanIntervalInSeconds +
    +                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
    +                        cleanEvent.wait(maxWaitMs);
    +                    }
    +                } catch (InterruptedException e) {
    +                    LOG.info("Received InterruptedException while " +
    +                            "waiting for cleanEvent");
    +                    break;
    +                }
    +            }
    +
    +            if (deadWatchers.isEmpty()) {
    +                continue;
    +            }
    +
    +            synchronized (this) {
    +                // Snapshot of the current dead watchers
    +                final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers);
    +                deadWatchers.clear();
    --- End diff --
    
    Clean the dead watchers need to go through all the current watches, which is pretty heavy and may take a second if there are millions of watches, that's why we're doing lazily batch clean up here, we don't want to block addDeadWatcher, which is called from the Cnxn.close while we're doing the clean work.
    
    The totalDeadWatchers is used to avoid OOM when the watcher cleaner cannot catch up (we haven't seen this problem even with heavy reconnecting scenario), it is suggested to be set to something like 1000 * watcherCleanThreshold. The watcherCleanThreshold is used to control the batch size when doing the clean up, there is trade off between GC the dead watcher memory and the time complexity of cleaning up, so we cannot set this too large. 


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @anmolnar I'll sign this off by end of next Monday if no other issues. 
    @lvfangmin great work and thanks for your patience!


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r216912996
  
    --- Diff: src/java/test/org/apache/zookeeper/server/DumbWatcher.java ---
    @@ -0,0 +1,96 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.nio.ByteBuffer;
    +import java.security.cert.Certificate;
    +
    +import org.apache.jute.Record;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.proto.ReplyHeader;
    +
    +public class DumbWatcher extends ServerCnxn {
    --- End diff --
    
    I agree from unit test case mock object is easier to maintain than stub ones, but I also need this DumbWatcher in the micro benchmark, I'll put this class somewhere in the code, so the micro benchmark and unit test can share it.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    merged to master. great work @lvfangmin !


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219562472
  
    --- Diff: src/test/java/bench/org/apache/zookeeper/BenchMain.java ---
    @@ -0,0 +1,12 @@
    +package org.apache.zookeeper;
    --- End diff --
    
    This file is missing apache license header. This triggers a -1 in last jenkins build.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Update based on the comments:
    
    1. add findbugs exclusion
    2. add comments for the newly added classes
    3. add admin doc for the new JVM options introduced in this diff
    4. move the bench project to src/test/java/bench
    5. move DumbWatcher to src/java/main dir for share between unit test and bench
    6. change to use ExitCode
    
    @nkalmar I moved the bench to src/test/java/bench, which seems more reasonable to me, let me know if you think that's not a good position based on your plan.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219641957
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java ---
    @@ -0,0 +1,144 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.server.ServerCnxn;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public interface IWatchManager {
    +
    +    /**
    +     * Add watch to specific path.
    +     *
    +     * @param path znode path
    +     * @param watcher watcher object reference
    +     *
    +     * @return true if the watcher added is not already present
    +     */
    +    public boolean addWatch(String path, Watcher watcher);
    +
    +    /**
    +     * Checks the specified watcher exists for the given path
    --- End diff --
    
    nit: missing full stop at end of sentence.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/zookeeper/pull/590


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220049442
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    +        BitSet bits = new BitSet();
    +        for (int dw: deadWatchers) {
    +            bits.set(dw);
    +        }
    +        // The value iterator will reflect the state when it was
    +        // created, don't need to synchronize.
    +        for (BitHashSet watchers: pathWatches.values()) {
    +            watchers.remove(deadWatchers, bits);
    +        }
    +        // Better to remove the empty path from pathWatches, but it will add
    +        // lot of lock contention and affect the throughput of addWatch,
    +        // let's rely on the triggerWatch to delete it.
    +        for (Integer wbit: deadWatchers) {
    +            watcherBitIdMap.remove(wbit);
    +        }
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(String path, EventType type) {
    +        return triggerWatch(path, type, null);
    +    }
    +
    +    @Override
    +    public WatcherOrBitSet triggerWatch(
    +            String path, EventType type, WatcherOrBitSet supress) {
    +        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    +
    +        BitHashSet watchers = remove(path);
    +        if (watchers == null) {
    +            return null;
    +        }
    +
    +        int triggeredWatches = 0;
    --- End diff --
    
    We had metrics for this, I removed those metrics because #580 was still in review, I'll add those metrics back since that patch has been merged. 


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214916521
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java ---
    @@ -0,0 +1,48 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.IOException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class WatchManagerFactory {
    --- End diff --
    
    A quick javadoc would be awesome here.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    @lvfangmin In which case you need to add exceptions to `config/findbugsExcludeFile.xml`.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214962201
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java ---
    @@ -46,15 +48,26 @@
         private final Map<Watcher, Set<String>> watch2Paths =
             new HashMap<Watcher, Set<String>>();
     
    -    synchronized int size(){
    +    @Override
    +    public synchronized int size(){
             int result = 0;
             for(Set<Watcher> watches : watchTable.values()) {
                 result += watches.size();
             }
             return result;
         }
     
    -    synchronized void addWatch(String path, Watcher watcher) {
    +    boolean isDeadWatcher(Watcher watcher) {
    --- End diff --
    
    Yes, this also fixed the dead watch leaking issue in Watch Manager which was found when I was building the new optimized watch manager.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219644285
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java ---
    @@ -180,7 +192,8 @@ synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
          *            watcher object reference
          * @return true if the watcher exists, false otherwise
          */
    --- End diff --
    
    we should remove this comment, which is already present in the `IWatchManager`, also for consistency with other overrides in `WatchManager`.


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    Thanks @nkalmar for the suggestion of bench project position, I was following the same directory as the src/java/systest for now, do you think we can move them together later? I'm not against to move it now.
    
    If we want to move now, just to confirm the directory is src/java/org/apache/zookeeper/bench/ without main folder, right?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220049264
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java ---
    @@ -0,0 +1,176 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Random;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.zookeeper.common.Time;
    +import org.apache.zookeeper.server.RateLogger;
    +import org.apache.zookeeper.server.WorkerService;
    +import org.apache.zookeeper.server.WorkerService.WorkRequest;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Thread used to lazily clean up the closed watcher, it will trigger the
    + * clean up when the dead watchers get certain number or some number of
    + * seconds has elapsed since last clean up.
    + *
    + * Cost of running it:
    + *
    + * - need to go through all the paths even if the watcher may only
    + *   watching a single path
    + * - block in the path BitHashSet when we try to check the dead watcher
    + *   which won't block other stuff
    + */
    +public class WatcherCleaner extends Thread {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
    +    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
    +
    +    private volatile boolean stopped = false;
    +    private final Object cleanEvent = new Object();
    +    private final Random r = new Random(System.nanoTime());
    +    private final WorkerService cleaners;
    +
    +    private final Set<Integer> deadWatchers;
    +    private final DeadWatcherListener listener;
    +    private final int watcherCleanThreshold;
    +    private final int watcherCleanIntervalInSeconds;
    +    private final int maxInProcessingDeadWatchers;
    +    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
    +
    +    public WatcherCleaner(DeadWatcherListener listener) {
    +        this(listener,
    +            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
    +            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
    +            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
    +            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
    +    }
    +
    +    public WatcherCleaner(DeadWatcherListener listener,
    +            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
    +            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
    +        this.listener = listener;
    +        this.watcherCleanThreshold = watcherCleanThreshold;
    +        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
    +        int suggestedMaxInProcessingThreshold =
    +                watcherCleanThreshold * watcherCleanThreadsNum;
    +        if (maxInProcessingDeadWatchers > 0 &&
    +                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
    +            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
    +            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
    +                    "than the suggested one, change it to use {}",
    +                    maxInProcessingDeadWatchers);
    +        }
    +        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
    +        this.deadWatchers = new HashSet<Integer>();
    +        this.cleaners = new WorkerService("DeadWatcherCleanner",
    +                watcherCleanThreadsNum, false);
    +
    +        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
    +                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
    +                watcherCleanThreshold, watcherCleanIntervalInSeconds,
    +                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
    +    }
    +
    +    public void addDeadWatcher(int watcherBit) {
    +        // Wait if there are too many watchers waiting to be closed,
    +        // this is will slow down the socket packet processing and
    +        // the adding watches in the ZK pipeline.
    +        while (maxInProcessingDeadWatchers > 0 && !stopped &&
    +                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
    --- End diff --
    
    @hanm I'm not sure I understand this correctly, the default value of maxInProcessingDeadWatchers is -1, in this case it will skip checking the totalDeadWatchers, am I missing anything?


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659984
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java ---
    @@ -0,0 +1,156 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.util.BitSet;
    +import java.util.Set;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +
    +import org.apache.zookeeper.server.util.BitMap;
    +
    +/**
    + * Using BitSet to store all the elements, and use HashSet to cache limited
    + * number of elements to find a balance between memory and time complexity.
    + *
    + * Without HashSet, we need to to use O(N) time to get the elements, N is
    + * the bit numbers in elementBits. But we need to keep the size small to make
    + * sure it doesn't cost too much in memory, there is a tradeoff between
    + * memory and time complexity.
    + *
    + * Previously, was deciding to dynamically switch between SparseBitSet and
    + * HashSet based on the memory consumption, but it will take time to copy
    + * data over and may have some herd effect of keep copying data from one
    + * data structure to anther. The current solution can do a very good job
    + * given most of the paths have limited number of elements.
    + */
    +public class BitHashSet implements Iterable<Integer> {
    +
    +    static final long serialVersionUID = 6382565447128283568L;
    --- End diff --
    
    why do we need this? `BitHashSet` is not implementing `Serializable` here...


---

[GitHub] zookeeper issue #590: [ZOOKEEPER-1177] Add the memory optimized watch manage...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/590
  
    This is going to be a huge improvement for Accumulo for example that is a heavy user of watchers. I'm going to allocate some capacity to review these new patches.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219645514
  
    --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---
    @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
                 </listitem>
               </varlistentry>
     
    +
    +          <varlistentry>
    +            <term>watchManaggerName</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watchManaggerName</emphasis>)</para>
    +
    +              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
    +                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
    +                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
    +                config is used define which watch manager to be used. Currently, we only support WatchManager and
    --- End diff --
    
    is used to define which watcher


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219645409
  
    --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---
    @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
                 </listitem>
               </varlistentry>
     
    +
    +          <varlistentry>
    +            <term>watchManaggerName</term>
    +
    +            <listitem>
    +              <para>(Java system property only: <emphasis
    +                    role="bold">zookeeper.watchManaggerName</emphasis>)</para>
    --- End diff --
    
    this should be `zookeeper.watchManagerName`.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r214912044
  
    --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    +                        watchers = existingWatchers;
    +                    }
    +                }
    +                result = watchers.add(bit);
    +            }
    +        } finally {
    +            addRemovePathRWLock.readLock().unlock();
    +        }
    +        return result;
    +    }
    +
    +    @Override
    +    public boolean containsWatcher(String path, Watcher watcher) {
    +        BitHashSet watchers = pathWatches.get(path);
    +        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public boolean removeWatcher(String path, Watcher watcher) {
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            BitHashSet list = pathWatches.get(path);
    +            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
    +                return false;
    +            }
    +            if (list.isEmpty()) {
    +                pathWatches.remove(path);
    +            }
    +            return true;
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +    }
    +
    +    @Override
    +    public void removeWatcher(Watcher watcher) {
    +        Integer watcherBit;
    +        addRemovePathRWLock.writeLock().lock();
    +        try {
    +            // do nothing if the watcher is not tracked
    +            watcherBit = watcherBitIdMap.getBit(watcher);
    +            if (watcherBit == null) {
    +                return;
    +            }
    +        } finally {
    +            addRemovePathRWLock.writeLock().unlock();
    +        }
    +        watcherCleaner.addDeadWatcher(watcherBit);
    +    }
    +
    +    /**
    +     * Entry for WatcherCleaner to remove dead watchers
    +     *
    +     * @param deadWatchers the watchers need to be removed
    +     */
    +    @Override
    +    public void processDeadWatchers(Set<Integer> deadWatchers) {
    --- End diff --
    
    Don't you need to acquire read lock from `addRemovePathRWLock` here?
    Both Maps are being modified here and I think there's a possibility for race with `addWatch()` method.


---

[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...

Posted by hanm <gi...@git.apache.org>.
Github user hanm commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r219659865
  
    --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java ---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.util;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +/**
    + * This is a helper class to maintain the bit to specific value and the
    + * reversed value to bit mapping.
    + */
    +public class BitMap<T> {
    +
    +    private final Map<T, Integer> value2Bit = new HashMap<T, Integer>();
    +    private final Map<Integer, T> bit2Value = new HashMap<Integer, T>();
    +
    +    private final BitSet freedBitSet = new BitSet();
    +    private Integer nextBit = Integer.valueOf(0);
    +
    +    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    +
    +    public Integer add(T value) {
    +        Integer bit = getBit(value);
    +        if (bit != null) {
    +            return bit;
    +        }
    --- End diff --
    
    I am also wondering, if this optimization is indeed useful, why not do the same for the `remove` methods, that is, check and return early with a read lock before trying to acquire a write lock.


---