You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/24 22:08:31 UTC

[helix] 01/02: Fix leaking Zk path watch and Callbackhandler issue (#1035)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 842bf1f9290cf30e946ff2bc8377b3e2f6014554
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Fri Jun 5 19:55:06 2020 -0700

    Fix leaking Zk path watch and Callbackhandler issue (#1035)
    
    Short term fix #1034. Get rid of dangling CallbackHandlers and its
    related current state parent path in Zookeeper. Get rid of leaking
    of current state znode path due to async nature of deletion of
    current state znode path to installatio of watcher in various
    thread in Helix.
---
 .../apache/helix/manager/zk/CallbackHandler.java   |  29 +++-
 .../integration/TestZkCallbackHandlerLeak.java     | 150 ++++++++++++++++++
 .../manager/ClusterControllerManager.java          |  65 +-------
 .../manager/ClusterDistributedController.java      |  38 +----
 ...rControllerManager.java => ClusterManager.java} |  28 +---
 .../manager/ClusterSpectatorManager.java           |  49 ++++++
 .../manager/MockParticipantManager.java            |  30 +---
 .../api/client/ChildrenSubscribeResult.java        |  53 +++++++
 .../zookeeper/api/client/RealmAwareZkClient.java   |  39 +++++
 .../zookeeper/impl/client/DedicatedZkClient.java   |  13 ++
 .../zookeeper/impl/client/FederatedZkClient.java   |  13 ++
 .../zookeeper/impl/client/SharedZkClient.java      |  13 ++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 167 ++++++++++++++++++---
 13 files changed, 516 insertions(+), 171 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 2192ca0..8266aba 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -73,6 +73,8 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
@@ -539,7 +541,20 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         logger.debug("{} subscribes child-change. path: {} , listener: {}",
             _manager.getInstanceName(), path, _listener );
       }
-      _zkClient.subscribeChildChanges(path, this);
+      // In the lifecycle of CallbackHandler, INIT is the first stage of registration of watch.
+      // For some usage case such as current state, the path can be created later. Thus we would
+      // install watch anyway event the path is not yet created.
+      // Later, CALLBACK type, the CallbackHandler already registered the watch and knows the
+      // path was created. Here, to avoid leaking path in ZooKeeper server, we would not let
+      // CallbackHandler to install exists watch, namely watch for path not existing.
+      // Note when path is removed, the CallbackHanler would remove itself from ZkHelixManager too
+      // to avoid leaking a CallbackHandler.
+      ChildrenSubscribeResult childrenSubscribeResult = _zkClient.subscribeChildChanges(path, this, callbackType != Type.INIT);
+      logger.debug("CallbackHandler {} subscribe data path {} result {}", this, path,
+          childrenSubscribeResult.isInstalled());
+      if (!childrenSubscribeResult.isInstalled()) {
+        logger.info("CallbackHandler {} subscribe data path {} failed!", this, path);
+      }
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
       logger.info("{} unsubscribe child-change. path: {}, listener: {}", _manager.getInstanceName(),
           path, _listener);
@@ -555,7 +570,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         logger.debug("{} subscribe data-change. path: {}, listener: {}", _manager.getInstanceName(),
             path, _listener);
       }
-      _zkClient.subscribeDataChanges(path, this);
+      boolean subStatus = _zkClient.subscribeDataChanges(path, this, callbackType != Type.INIT);
+      logger.debug("CallbackHandler {} subscribe data path {} result {}", this, path, subStatus);
+      if (!subStatus) {
+        logger.info("CallbackHandler {} subscribe data path {} failed!", this, path);
+      }
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
       logger.info("{} unsubscribe data-change. path: {}, listener: {}",
           _manager.getInstanceName(), path, _listener);
@@ -756,6 +775,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
           // removeListener will call handler.reset(), which in turn call invoke() on FINALIZE type
           _manager.removeListener(_propertyKey, _listener);
         } else {
+          if (!isReady()) {
+            // avoid leaking CallbackHandler
+            logger.info("Callbackhandler {} with path {} is in reset state. Stop subscription to ZK client to avoid leaking",
+                this, parentPath);
+            return;
+          }
           NotificationContext changeContext = new NotificationContext(_manager);
           changeContext.setType(NotificationContext.Type.CALLBACK);
           changeContext.setPathChanged(parentPath);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 7eb74a7..0bcee6d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -25,15 +25,22 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterSpectatorManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -319,6 +326,149 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
   }
 
   @Test
+  public void testDanglingCallbackHanlderFix() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Routing provider is a spectator in Helix. Currentstate based RP listens on all the
+    // currentstate changes of all the clusters. They are a source of leaking of watch in
+    // Zookeeper server.
+    ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR, clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, PropertyType.CURRENTSTATES);
+
+    //TODO: The following three sleep() is not the best practice. On the other hand, we don't have the testing
+    // facilities to avoid them yet. We will enhance later.
+    Thread.sleep(5000);
+
+    // expire RoutingProvider would create dangling CB
+    LOG.info("expire rp manager session:", rpManager.getSessionId());
+    ZkTestHelper.expireSession(rpManager.getZkClient());
+    LOG.info("rp manager new session:", rpManager.getSessionId());
+
+    Thread.sleep(5000);
+
+    MockParticipantManager participantToExpire = participants[0];
+    String oldSessionId = participantToExpire.getSessionId();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+    // expire participant session; leaked callback handler used to be not reset() and be removed from ZkClient
+    LOG.info("Expire participant: " + participantToExpire.getInstanceName() + ", session: "
+        + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    LOG.info(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    Thread.sleep(5000);
+    Map<String, Set<IZkChildListener>> childListeners =
+        ZkTestHelper.getZkChildListener(rpManager.getZkClient());
+    for (String path : childListeners.keySet()) {
+      Assert.assertTrue(childListeners.get(path).size() <= 1);
+    }
+
+    Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    List<String> existWatches = rpWatchPaths.get("existWatches");
+    Assert.assertTrue(existWatches.isEmpty());
+  }
+
+  @Test
+  public void testCurrentStatePathLeakingByAsycRemoval() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    final int mJobUpdateCnt = 500;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    Boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr, clusterName));
+    Assert.assertTrue(result);
+
+    ClusterSpectatorManager rpManager = new ClusterSpectatorManager(ZK_ADDR, clusterName, "router");
+    rpManager.syncStart();
+    RoutingTableProvider rp = new RoutingTableProvider(rpManager, PropertyType.CURRENTSTATES);
+
+    LOG.info("add job");
+    MockParticipantManager jobParticipant = participants[0];
+    String jobSessionId = jobParticipant.getSessionId();
+    HelixDataAccessor jobAccesor = jobParticipant.getHelixDataAccessor();
+    PropertyKey.Builder jobKeyBuilder = new PropertyKey.Builder(clusterName);
+    PropertyKey db0key =
+        jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId, "TestDB0");
+    CurrentState db0 = jobAccesor.getProperty(db0key);
+    PropertyKey jobKey =
+        jobKeyBuilder.currentState(jobParticipant.getInstanceName(), jobSessionId, "BackupQueue");
+    CurrentState cs = new CurrentState("BackupQueue");
+    cs.setSessionId(jobSessionId);
+    cs.setStateModelDefRef(db0.getStateModelDefRef());
+
+    LOG.info("add job");
+    boolean rtJob = false;
+    for (int i = 0; i < mJobUpdateCnt; i++) {
+      rtJob = jobAccesor.setProperty(jobKey, cs);
+    }
+
+    LOG.info("remove job");
+    rtJob = jobParticipant.getZkClient().delete(jobKey.getPath());
+
+    // validate the job watch is not leaked.
+    Thread.sleep(5000);
+
+    Map<String, Set<String>> listenersByZkPath = ZkTestHelper.getListenersByZkPath(ZK_ADDR);
+    boolean jobKeyExists = listenersByZkPath.keySet().contains(jobKey.getPath());
+    Assert.assertFalse(jobKeyExists);
+
+    Map<String, List<String>> rpWatchPaths = ZkTestHelper.getZkWatch(rpManager.getZkClient());
+    List<String> existWatches = rpWatchPaths.get("existWatches");
+    Assert.assertTrue(existWatches.isEmpty());
+  }
+
+  @Test
   public void testRemoveUserCbHandlerOnPathRemoval() throws Exception {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 9281e2d..f7bae1f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -35,76 +35,15 @@ import org.slf4j.LoggerFactory;
 /**
  * The standalone cluster controller class
  */
-public class ClusterControllerManager extends ZKHelixManager implements Runnable, ZkTestManager {
+public class ClusterControllerManager extends ClusterManager {
   private static Logger LOG = LoggerFactory.getLogger(ClusterControllerManager.class);
 
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
-  private boolean _started = false;
-
   public ClusterControllerManager(String zkAddr, String clusterName) {
     this(zkAddr, clusterName, "controller");
   }
 
   public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
-    super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-      _started = false;
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for finish", e);
-    }
-  }
-
-  // This should not be called more than once because HelixManager.connect() should not be called more than once.
-  public void syncStart() {
-    if (_started) {
-      throw new RuntimeException(
-          "Helix Controller already started. Do not call syncStart() more than once.");
-    } else {
-      _started = true;
-    }
-
-    new Thread(this).start();
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for start", e);
-    }
+    super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER);
   }
 
-  @Override
-  public void run() {
-    try {
-      connect();
-      _startCountDown.countDown();
-      _stopCountDown.await();
-    } catch (Exception e) {
-      LOG.error("exception running controller-manager", e);
-    } finally {
-      _startCountDown.countDown();
-      disconnect();
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-
-  @Override
-  public RealmAwareZkClient getZkClient() {
-    return _zkclient;
-  }
-
-  @Override
-  public List<CallbackHandler> getHandlers() {
-    return _handlers;
-  }
-
-  public List<HelixTimerTask> getControllerTimerTasks() {
-    return _controllerTimerTasks;
-  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 397fae5..d781854 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -32,35 +32,11 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ClusterDistributedController extends ZKHelixManager implements Runnable,
-    ZkTestManager {
+public class ClusterDistributedController extends ClusterManager {
   private static Logger LOG = LoggerFactory.getLogger(ClusterDistributedController.class);
 
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
   public ClusterDistributedController(String zkAddr, String clusterName, String controllerName) {
-    super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for finish", e);
-    }
-  }
-
-  public void syncStart() {
-    // TODO: prevent start multiple times
-    new Thread(this).start();
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for start", e);
-    }
+    super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT);
   }
 
   @Override
@@ -82,14 +58,4 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
       _waitStopFinishCountDown.countDown();
     }
   }
-
-  @Override
-  public RealmAwareZkClient getZkClient() {
-    return _zkclient;
-  }
-
-  @Override
-  public List<CallbackHandler> getHandlers() {
-    return _handlers;
-  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
similarity index 73%
copy from helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
copy to helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
index 9281e2d..9e21ea8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
@@ -22,34 +22,25 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * The standalone cluster controller class
- */
-public class ClusterControllerManager extends ZKHelixManager implements Runnable, ZkTestManager {
+public class ClusterManager extends ZKHelixManager implements Runnable, ZkTestManager {
   private static Logger LOG = LoggerFactory.getLogger(ClusterControllerManager.class);
 
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
-  private boolean _started = false;
+  protected CountDownLatch _startCountDown = new CountDownLatch(1);
+  protected CountDownLatch _stopCountDown = new CountDownLatch(1);
+  protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
-  public ClusterControllerManager(String zkAddr, String clusterName) {
-    this(zkAddr, clusterName, "controller");
-  }
+  protected boolean _started = false;
 
-  public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
-    super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
+  protected ClusterManager(String zkAddr, String clusterName, String instanceName, InstanceType type) {
+    super(clusterName, instanceName, type, zkAddr);
   }
 
   public void syncStop() {
@@ -103,8 +94,5 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
   public List<CallbackHandler> getHandlers() {
     return _handlers;
   }
-
-  public List<HelixTimerTask> getControllerTimerTasks() {
-    return _controllerTimerTasks;
-  }
 }
+
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java
new file mode 100644
index 0000000..2975f0a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java
@@ -0,0 +1,49 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ClusterSpectatorManager extends ClusterManager{
+  private static Logger LOG = LoggerFactory.getLogger(ClusterControllerManager.class);
+
+  private final CountDownLatch _startCountDown = new CountDownLatch(1);
+  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+  private boolean _started = false;
+
+  public ClusterSpectatorManager(String zkAddr, String clusterName) {
+    this(zkAddr, clusterName, "spectator");
+  }
+
+  public ClusterSpectatorManager(String zkAddr, String clusterName, String spectatorName) {
+    super(zkAddr, clusterName, spectatorName, InstanceType.SPECTATOR);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 4a44502..0b1983a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -38,13 +38,9 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
+public class MockParticipantManager extends ClusterManager {
   private static Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);
 
-  protected CountDownLatch _startCountDown = new CountDownLatch(1);
-  protected CountDownLatch _stopCountDown = new CountDownLatch(1);
-  protected CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
-
   protected int _transDelay = 10;
 
   protected MockMSModelFactory _msModelFactory;
@@ -63,7 +59,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
       int transDelay, HelixCloudProperty helixCloudProperty) {
-    super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
+    super(zkAddr, clusterName, instanceName, InstanceType.PARTICIPANT);
     _transDelay = transDelay;
     _msModelFactory = new MockMSModelFactory(null);
     _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay);
@@ -75,24 +71,6 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
     _msModelFactory.setTrasition(transition);
   }
 
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopCompleteCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("exception in syncStop participant-manager", e);
-    }
-  }
-
-  public void syncStart() {
-    try {
-      new Thread(this).start();
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("exception in syncStart participant-manager", e);
-    }
-  }
-
   /**
    * This method should be called before syncStart() called after syncStop()
    */
@@ -100,7 +78,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
     syncStop();
     _startCountDown = new CountDownLatch(1);
     _stopCountDown = new CountDownLatch(1);
-    _waitStopCompleteCountDown = new CountDownLatch(1);
+    _waitStopFinishCountDown = new CountDownLatch(1);
   }
 
   @Override
@@ -132,7 +110,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
       _startCountDown.countDown();
 
       disconnect();
-      _waitStopCompleteCountDown.countDown();
+      _waitStopFinishCountDown.countDown();
     }
   }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
new file mode 100644
index 0000000..32f559b
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/ChildrenSubscribeResult.java
@@ -0,0 +1,53 @@
+package org.apache.helix.zookeeper.api.client;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+
+
+/** Represents return type of {@link org.apache.helix.zookeeper.api.client.RealmAwareZkClient#subscribeChildChanges(String, IZkChildListener, boolean)}
+ *  The returned value would signal if watch installation to ZooKeeper server succeeded
+ *  or not using field _isInstalled. The _children field would contains the list of child names
+ *  of the watched path. It would be null if the parent path does not exist at the time of watch
+ *  installation.
+ */
+public class ChildrenSubscribeResult {
+  private final List<String> _children;
+  private final boolean _isInstalled;
+
+  public ChildrenSubscribeResult(List<String> children, boolean isInstalled) {
+    if (children != null) {
+      _children = Collections.unmodifiableList(children);
+    } else {
+      _children = null;
+    }
+    _isInstalled = isInstalled;
+  }
+
+  public List<String> getChildren() {
+    return _children;
+  }
+
+  public boolean isInstalled() {
+    return _isInstalled;
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index 22f3678..84f52b9 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -69,12 +69,51 @@ public interface RealmAwareZkClient {
   int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   // listener subscription
+  /**
+   * Subscribe the path and the listener will handle child events of the path.
+   * Add exists watch to path if the path does not exist in ZooKeeper server.
+   * WARNING: if the path is created after deletion, users need to re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * The method return null if the path does not exists. Otherwise, return a list of children
+   * under the path. The list can be empty if there is no children.
+   */
+  @Deprecated
   List<String> subscribeChildChanges(String path, IZkChildListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle child events of the path
+   * WARNING: if the path is created after deletion, users need to re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * @param skipWatchingNonExistNode True means not installing any watch if path does not exist.
+   * @return ChildrentSubsribeResult. If the path does not exists, the isInstalled field
+   * is false. Otherwise, it is true and list of children are returned.
+   */
+  ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode);
+
   void unsubscribeChildChanges(String path, IZkChildListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle data events of the path
+   * Add the exists watch to Zookeeper server even if the path does not exists in zookeeper server
+   * WARNING: if the path is created after deletion, users need to re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   */
+  @Deprecated
   void subscribeDataChanges(String path, IZkDataListener listener);
 
+  /**
+   * Subscribe the path and the listener will handle data events of the path
+   * WARNING: if the path is created after deletion, users need to re-subscribe the path
+   * @param path The zookeeper path
+   * @param listener Instance of {@link IZkDataListener}
+   * @param skipWatchingNonExistNode True means not installing any watch if path does not exist.
+   * return True if installation of watch succeed. Otherwise, return false.
+   */
+  boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode);
+
   void unsubscribeDataChanges(String path, IZkDataListener listener);
 
   /*
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index 09b66fb..20795d7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -122,6 +123,12 @@ public class DedicatedZkClient implements RealmAwareZkClient {
   }
 
   @Override
+  public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener,
+      boolean skipWatchingNodeNotExist) {
+    return _rawZkClient.subscribeChildChanges(path, listener, skipWatchingNodeNotExist);
+  }
+
+  @Override
   public void unsubscribeChildChanges(String path, IZkChildListener listener) {
     checkIfPathContainsShardingKey(path);
     _rawZkClient.unsubscribeChildChanges(path, listener);
@@ -134,6 +141,12 @@ public class DedicatedZkClient implements RealmAwareZkClient {
   }
 
   @Override
+  public boolean subscribeDataChanges(String path, IZkDataListener listener,
+      boolean skipWatchingNodeNotExist) {
+    return _rawZkClient.subscribeDataChanges(path, listener, skipWatchingNodeNotExist);
+  }
+
+  @Override
   public void unsubscribeDataChanges(String path, IZkDataListener listener) {
     checkIfPathContainsShardingKey(path);
     _rawZkClient.unsubscribeDataChanges(path, listener);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 1cebc90..9e6c2ec 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
@@ -115,6 +116,12 @@ public class FederatedZkClient implements RealmAwareZkClient {
   }
 
   @Override
+  public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener,
+      boolean skipWatchingNodeNotExist) {
+    return getZkClient(path).subscribeChildChanges(path, listener, skipWatchingNodeNotExist);
+  }
+
+  @Override
   public void unsubscribeChildChanges(String path, IZkChildListener listener) {
     getZkClient(path).unsubscribeChildChanges(path, listener);
   }
@@ -125,6 +132,12 @@ public class FederatedZkClient implements RealmAwareZkClient {
   }
 
   @Override
+  public boolean subscribeDataChanges(String path, IZkDataListener listener,
+      boolean skipWatchingNodeNotExist) {
+    return getZkClient(path).subscribeDataChanges(path, listener, skipWatchingNodeNotExist);
+  }
+
+  @Override
   public void unsubscribeDataChanges(String path, IZkDataListener listener) {
     getZkClient(path).unsubscribeDataChanges(path, listener);
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index a2b0d00..d5b1c6d 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
@@ -125,6 +126,12 @@ public class SharedZkClient implements RealmAwareZkClient {
   }
 
   @Override
+  public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener,
+      boolean skipWatchingNodeNotExist) {
+    return _innerSharedZkClient.subscribeChildChanges(path, listener, skipWatchingNodeNotExist);
+  }
+
+  @Override
   public void unsubscribeChildChanges(String path, IZkChildListener listener) {
     checkIfPathContainsShardingKey(path);
     _innerSharedZkClient.unsubscribeChildChanges(path, listener);
@@ -137,6 +144,12 @@ public class SharedZkClient implements RealmAwareZkClient {
   }
 
   @Override
+  public boolean subscribeDataChanges(String path, IZkDataListener listener,
+      boolean skipWatchingNodeNotExist) {
+    return _innerSharedZkClient.subscribeDataChanges(path, listener, skipWatchingNodeNotExist);
+  }
+
+  @Override
   public void unsubscribeDataChanges(String path, IZkDataListener listener) {
     checkIfPathContainsShardingKey(path);
     _innerSharedZkClient.unsubscribeDataChanges(path, listener);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 083d9f4..b798fbe 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -24,6 +24,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import javax.management.JMException;
 
+import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
@@ -212,6 +214,11 @@ public class ZkClient implements Watcher {
   }
 
   public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
+    ChildrenSubscribeResult result = subscribeChildChanges(path, listener, false);
+    return result.getChildren();
+  }
+
+  public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode) {
     synchronized (_childListener) {
       Set<IZkChildListener> listeners = _childListener.get(path);
       if (listeners == null) {
@@ -220,7 +227,15 @@ public class ZkClient implements Watcher {
       }
       listeners.add(listener);
     }
-    return watchForChilds(path);
+
+    List<String> children = watchForChilds(path, skipWatchingNonExistNode);
+    if (children == null && skipWatchingNonExistNode) {
+      unsubscribeChildChanges(path, listener);
+      LOG.info("watchForChilds failed to install no-existing watch and add listener. Path: {}", path);
+      return new ChildrenSubscribeResult(children, false);
+    }
+
+    return new ChildrenSubscribeResult(children, true);
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
@@ -232,13 +247,7 @@ public class ZkClient implements Watcher {
     }
   }
 
-  /**
-   * Subscribe the path and the listener will handle data events of the path
-   * WARNING: if the path is created after deletion, users need to re-subscribe the path
-   * @param path The zookeeper path
-   * @param listener Instance of {@link IZkDataListener}
-   */
-  public void subscribeDataChanges(String path, IZkDataListener listener) {
+  public boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode) {
     Set<IZkDataListenerEntry> listenerEntries;
     synchronized (_dataListener) {
       listenerEntries = _dataListener.get(path);
@@ -257,10 +266,29 @@ public class ZkClient implements Watcher {
         }
       }
     }
-    watchForData(path);
+
+    boolean watchInstalled = watchForData(path, skipWatchingNonExistNode);
+    if (!watchInstalled) {
+      // Now let us remove this handler.
+      unsubscribeDataChanges(path, listener);
+      LOG.info("watchForData failed to install no-existing path and thus add listener. Path:" + path);
+      return false;
+    }
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Subscribed data changes for " + path);
     }
+    return true;
+  }
+
+   /**
+    * Subscribe the path and the listener will handle data events of the path
+    * WARNING: if the path is created after deletion, users need to re-subscribe the path
+    * @param path The zookeeper path
+    * @param listener Instance of {@link IZkDataListener}
+    */
+  public void subscribeDataChanges(String path, IZkDataListener listener) {
+    subscribeDataChanges(path, listener, false);
   }
 
   private boolean isPrefetchEnabled(IZkDataListener dataListener) {
@@ -1056,14 +1084,12 @@ public class ZkClient implements Watcher {
 
   private Stat getStat(final String path, final boolean watch) {
     long startT = System.currentTimeMillis();
+    final Stat stat;
     try {
-      Stat stat = retryUntilConnected(
+      stat = retryUntilConnected(
           () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, watch));
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
       return stat;
-    } catch (ZkNoNodeException e) {
-      record(path, null, startT, ZkClientMonitor.AccessType.READ);
-      throw e;
     } catch (Exception e) {
       recordFailure(path, ZkClientMonitor.AccessType.READ);
       throw e;
@@ -1075,6 +1101,35 @@ public class ZkClient implements Watcher {
     }
   }
 
+  /*
+   * This one installs watch only if path is there. Meant to avoid leaking watch in Zk server.
+   */
+  private Stat installWatchOnlyPathExist(final String path) {
+    long startT = System.currentTimeMillis();
+    final Stat stat;
+    try {
+        stat = new Stat();
+        try {
+          LOG.debug("installWatchOnlyPathExist with path: {} ", path);
+          retryUntilConnected(() -> ((ZkConnection) getConnection()).getZookeeper().getData(path, true, stat));
+        } catch (ZkNoNodeException e) {
+          LOG.debug("installWatchOnlyPathExist path not existing: {}", path);
+          record(path, null, startT, ZkClientMonitor.AccessType.READ);
+          return null;
+        }
+      record(path, null, startT, ZkClientMonitor.AccessType.READ);
+      return stat;
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.READ);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("getData (installWatchOnlyPathExist), path: " + path + ", time: " + (endT - startT) + " ms");
+      }
+    }
+  }
+
   protected void processStateChanged(WatchedEvent event) {
     LOG.info("zookeeper state changed (" + event.getState() + ")");
     setCurrentState(event.getState());
@@ -1271,6 +1326,9 @@ public class ZkClient implements Watcher {
   private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
     final String path = event.getPath();
     final boolean pathExists = event.getType() != EventType.NodeDeleted;
+    if (EventType.NodeDeleted == event.getType()) {
+      LOG.debug("Event NodeDeleted: {}", event.getPath());
+    }
 
     if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
         || event.getType() == EventType.NodeDeleted) {
@@ -1304,8 +1362,20 @@ public class ZkClient implements Watcher {
           @Override
           public void run() throws Exception {
             if (!pathStatRecord.pathChecked()) {
-              // getStat will re-install watcher only when the path exists
-              pathStatRecord.recordPathStat(getStat(path, pathExists), notificationTime);
+              // getStat() wrapp two ways to install data watch by using exists() or getData().
+              // getData() aka useGetData (true) would not install the watch if the node not ]
+              // existing. Exists() aka useGetData (false) would install (leak) the watch if the
+              // node not existing.
+              // Here the goal is to avoid leaking watch. Thus, if we know path not exists, we use
+              // the exists() useGetData (false) route to check stat. Otherwise, we use getData()
+              // to install watch.
+              Stat stat = null;
+              if (!pathExists) {
+                stat = getStat(path, false);
+              } else {
+                stat = installWatchOnlyPathExist(path);
+              }
+              pathStatRecord.recordPathStat(stat, notificationTime);
             }
             if (!pathStatRecord.pathExists()) {
               listener.getDataListener().handleDataDeleted(path);
@@ -1342,8 +1412,15 @@ public class ZkClient implements Watcher {
           @Override
           public void run() throws Exception {
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, hasListeners(path) && pathExists),
-                  OptionalLong.empty());
+              Stat stat = null;
+              if (!pathExists || !hasListeners(path)) {
+                // will not install listener using exists call
+                stat = getStat(path, false);
+              } else {
+                // will install listener using getData() call; if node not there, install nothing
+                stat = installWatchOnlyPathExist(path);
+              }
+              pathStatRecord.recordPathStat(stat, OptionalLong.empty());
             }
             List<String> children = null;
             if (pathStatRecord.pathExists()) {
@@ -1850,13 +1927,22 @@ public class ZkClient implements Watcher {
   }
 
   public void watchForData(final String path) {
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        getConnection().exists(path, true);
-        return null;
+    watchForData(path, false);
+  }
+
+  private boolean watchForData(final String path, boolean skipWatchingNonExistNode) {
+    try {
+      if (skipWatchingNonExistNode) {
+        retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper().getData(path, true, new Stat())));
+      } else {
+        retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper().exists(path, true)));
       }
-    });
+    } catch (ZkNoNodeException e) {
+      // Do nothing, this is what we want as this is not going to leak watch in ZooKeeepr server.
+      LOG.info("watchForData path not existing: " + path);
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -1866,17 +1952,50 @@ public class ZkClient implements Watcher {
    *         exist.
    */
   public List<String> watchForChilds(final String path) {
+    return watchForChilds(path, false);
+  }
+
+  /**
+   *  The following captures about how we reason Zookeeper watch leakage issue based on various
+   *  comments in review
+   *  1. Removal of a parent zk path (such as currentstate/sessionid) is async to all threads in
+   *  Helix router or controller which watches the path. Thus, if we install a watch to a path
+   *  expected to be created, we always have the risk of leaking if the path changed.
+   *
+   *  2. Current the CallbackHandler life cycle is like this:
+   *  CallbackHandler for currentstate and some others can be created before the parent path is
+   *  created. Thus, we still needs exists() call. This corresponds to INIT change type of
+   *  CallbackHanlder. This is the time eventually watchForChilds() with be called with
+   *  skipWatchingNonExistNode as false.
+   *  Aside from creation time, CallbackHandler normal cycle would see CALLBACK change type. This
+   *  time we should normally expected the parent path is created. Thus, the subscription from
+   *  CallbackHandler would use skipWatchingNonExistNode false. Avoid leaking path.
+   *  Note, if the path is removed, CallbackHandler would see children of parent path as null. THis
+   *  would end the CallbackHanlder' life.
+   *
+   *  From the above life cycle of Callbackhandler, we know the only place that can leak is that
+   *  INIT change type time, participant expires the session more than twice in a row before the
+   *  watchForChild(skipWatchingNonExistNode=false) issue exists() call.
+   *
+   *  THe chance of this sequence is slim though.
+   *
+   */
+  private List<String> watchForChilds(final String path, boolean skipWatchingNonExistNode) {
     if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
       throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
     }
     return retryUntilConnected(new Callable<List<String>>() {
       @Override
       public List<String> call() throws Exception {
-        exists(path, true);
+        if (!skipWatchingNonExistNode) {
+          exists(path, true);
+        }
         try {
           return getChildren(path, true);
         } catch (ZkNoNodeException e) {
           // ignore, the "exists" watch will listen for the parent node to appear
+          LOG.info("watchForChilds path not existing:{} skipWatchingNodeNoteExist: {}",
+              path, skipWatchingNonExistNode);
         }
         return null;
       }