You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/04 02:50:14 UTC

git commit: HELIX-40: fix zkclient subscribe path leaking and zk callback-handler leaking in case of session expiry

Updated Branches:
  refs/heads/master c52baf615 -> 03423e624


HELIX-40: fix zkclient subscribe path leaking and zk callback-handler leaking in case of session expiry


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/03423e62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/03423e62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/03423e62

Branch: refs/heads/master
Commit: 03423e6248fb970251c4443ddba94fac79076d93
Parents: c52baf6
Author: zzhang <zz...@uci.edu>
Authored: Sun Feb 3 17:50:00 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Sun Feb 3 17:50:00 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixManager.java   |    2 +-
 .../helix/controller/GenericHelixController.java   |  135 ++++++-----
 .../helix/controller/HelixControllerMain.java      |    3 +-
 .../apache/helix/manager/zk/CallbackHandler.java   |   10 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |  127 +++++-----
 .../participant/GenericLeaderStandbyModel.java     |   27 ++-
 .../src/test/java/org/apache/helix/Mocks.java      |    2 +-
 .../test/java/org/apache/helix/ZkTestHelper.java   |    3 +-
 .../controller/stages/DummyClusterManager.java     |    3 +-
 .../integration/TestZkCallbackHandlerLeak.java     |  197 +++++++++++++++
 .../helix/manager/zk/TestLiveInstanceBounce.java   |   11 +-
 .../helix/manager/zk/TestZkClusterManager.java     |    4 +-
 .../helix/mock/controller/ClusterController.java   |   18 +-
 .../helix/mock/participant/MockParticipant.java    |    2 +-
 .../helix/participant/MockZKHelixManager.java      |    3 +-
 15 files changed, 389 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 4d3adc8..7c91cd6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -176,7 +176,7 @@ public interface HelixManager
    * @param listener
    * @return
    */
-  boolean removeListener(Object listener);
+  boolean removeListener(PropertyKey key, Object listener);
   /**
    * Return the client to perform read/write operations on the cluster data
    * store

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ba22b63..deb2bdb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -21,11 +21,14 @@ package org.apache.helix.controller;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ConfigChangeListener;
@@ -101,18 +104,9 @@ public class GenericHelixController implements
   volatile boolean               init   = false;
   private final PipelineRegistry _registry;
 
-  /**
-   * Since instance current state is per-session-id, we need to track the session-ids of
-   * the current states that the ClusterController is observing. this set contains all the
-   * session ids that we add currentState listener
-   */
-  private final Set<String>      _instanceCurrentStateChangeSubscriptionSessionIds;
-
-  /**
-   * this set contains all the instance names that we add message listener
-   */
-  private final Set<String>      _instanceSubscriptionNames;
-
+  final AtomicReference<Map<String, LiveInstance>>	_lastSeenInstances;
+  final AtomicReference<Map<String, LiveInstance>>	_lastSeenSessions;
+  
   ClusterStatusMonitor           _clusterStatusMonitor;
   
 
@@ -264,10 +258,8 @@ public class GenericHelixController implements
   {
     _paused = false;
     _registry = registry;
-    _instanceCurrentStateChangeSubscriptionSessionIds =
-        new ConcurrentSkipListSet<String>();
-    _instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
-    // _externalViewGenerator = new ExternalViewGenerator();
+    _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
+    _lastSeenSessions = new AtomicReference<Map<String,LiveInstance>>();
   }
 
   /**
@@ -563,55 +555,72 @@ public class GenericHelixController implements
   protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
                                                NotificationContext changeContext)
   {
-    for (LiveInstance instance : liveInstances)
-    {
-      String instanceName = instance.getId();
-      String clientSessionId = instance.getSessionId();
-      HelixManager manager = changeContext.getManager();
 
-      // _instanceCurrentStateChangeSubscriptionSessionIds contains all the sessionIds
-      // that we've added a currentState listener
-      if (!_instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId))
-      {
-        try
-        {
-          manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
-          _instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
-          logger.info("Observing client session id: " + clientSessionId);
-        }
-        catch (Exception e)
-        {
-          logger.error("Exception adding current state and message listener for instance:"
-                           + instanceName,
-                       e);
-        }
-      }
-
-      // _instanceSubscriptionNames contains all the instanceNames that we've added a
-      // message listener
-      if (!_instanceSubscriptionNames.contains(instanceName))
-      {
-        try
-        {
-          logger.info("Adding message listener for " + instanceName);
-          manager.addMessageListener(this, instanceName);
-          _instanceSubscriptionNames.add(instanceName);
-        }
-        catch (Exception e)
-        {
-          logger.error("Exception adding message listener for instance:" + instanceName,
-                       e);
-        }
-      }
-
-      // TODO we need to remove currentState listeners and message listeners
-      // when a session or an instance no longer exists. This may happen
-      // in case of session expiry, participant rebound, participant goes and new
-      // participant comes
-
-      // TODO shi should call removeListener on the previous session id;
-      // but the removeListener with that functionality is not implemented yet
+	// construct maps for current live-instances
+	Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
+	Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
+	for(LiveInstance liveInstance : liveInstances) {
+		curInstances.put(liveInstance.getInstanceName(), liveInstance);
+		curSessions.put(liveInstance.getSessionId(), liveInstance);
+	}
+
+	Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
+	Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
+
+    HelixManager manager = changeContext.getManager();
+    Builder keyBuilder = new Builder(manager.getClusterName());
+    if (lastSessions != null) {
+    	for (String session : lastSessions.keySet()) {
+    		if (!curSessions.containsKey(session)) {
+    			// remove current-state listener for expired session
+    		    String instanceName = lastSessions.get(session).getInstanceName();
+    			manager.removeListener(keyBuilder.currentStates(instanceName, session), this); 
+    		}
+    	}
+    }
+    
+    if (lastInstances != null) {
+    	for (String instance : lastInstances.keySet()) {
+    		if (!curInstances.containsKey(instance)) {
+    			// remove message listener for disconnected instances
+    			manager.removeListener(keyBuilder.messages(instance), this);
+    		}
+    	}
     }
+    
+	for (String session : curSessions.keySet()) {
+		if (lastSessions == null || !lastSessions.containsKey(session)) {
+	      String instanceName = curSessions.get(session).getInstanceName();
+          try {
+            // add current-state listeners for new sessions
+	        manager.addCurrentStateChangeListener(this, instanceName, session);
+	        logger.info("Succeed in addling current state listener for instance: " + instanceName + " with session: " + session);
+
+          } catch (Exception e) {
+        	  logger.error("Fail to add current state listener for instance: "
+        		  + instanceName + " with session: " + session, e);
+          }
+		}
+	}
+
+	for (String instance : curInstances.keySet()) {
+		if (lastInstances == null || !lastInstances.containsKey(instance)) {
+	        try {
+	          // add message listeners for new sessions
+	          manager.addMessageListener(this, instance);
+	          logger.info("Succeed in adding message listener for " + instance);
+	        }
+	        catch (Exception e)
+	        {
+	          logger.error("Fail to add message listener for instance:" + instance, e);
+	        }
+		}
+	}
+
+	// update last-seen
+	_lastSeenInstances.set(curInstances);
+	_lastSeenSessions.set(curSessions);
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 338e03c..b595879 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -151,7 +151,8 @@ public class HelixControllerMain
       manager.addConfigChangeListener(controller);
       manager.addLiveInstanceChangeListener(controller);
       manager.addIdealStateChangeListener(controller);
-      manager.addExternalViewChangeListener(controller);
+      // no need for controller to listen on external-view
+      // manager.addExternalViewChangeListener(controller);
       manager.addControllerListener(controller);
     } catch (ZkInterruptedException e)
     {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 4e08ab0..0e70a17 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -350,7 +350,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       }
       catch (ZkNoNodeException e)
       {
-        logger.warn("fail to subscribe child/data change@" + path, e);
+        logger.warn("fail to subscribe child/data change@. path: " + path 
+        		+ ", listener: " + _listener, e);
       }
     }
 
@@ -396,6 +397,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     }
     catch (Exception e)
     {
+      logger.error("exception in handling data-change. path: " + dataPath 
+    		  + ", listener: " + _listener);
       ZKExceptionHandler.getInstance().handle(e);
     }
   }
@@ -412,7 +415,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
         		  + dataPath + ", listener: " + _listener);
           _zkClient.unsubscribeDataChanges(dataPath, this);
 
-          // only needed for bucketized parent, but OK if we don't have child-change watch on the path
+          // only needed for bucketized parent, but OK if we don't have child-change 
+          //  watch on the bucketized parent path
           logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
         		  + dataPath + ", listener: " + _listener);
           _zkClient.unsubscribeChildChanges(dataPath, this);
@@ -444,6 +448,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     }
     catch (Exception e)
     {
+      logger.error("exception in handling child-change. parentPath: " 
+    		  + parentPath + ", listener: " + _listener);
       ZKExceptionHandler.getInstance().handle(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index f542993..18c3bd5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -102,7 +102,7 @@ public class ZKHelixManager implements HelixManager
   private ZKHelixDataAccessor                  _helixAccessor;
   private ConfigAccessor                       _configAccessor;
   protected ZkClient                           _zkClient;
-  protected final List<CallbackHandler>         _handlers;
+  protected List<CallbackHandler>         	   _handlers;
   private final ZkStateChangeListener          _zkStateChangeListener;
   private final InstanceType                   _instanceType;
   volatile String                              _sessionId;
@@ -201,8 +201,6 @@ public class ZKHelixManager implements HelixManager
     _zkStateChangeListener = new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
     _timer = null;
 
-    _handlers = new ArrayList<CallbackHandler>();
-
     _messagingService = new DefaultMessagingService(this);
 
     _version =
@@ -246,6 +244,35 @@ public class ZKHelixManager implements HelixManager
     return true;
   }
   
+  @Override
+  public boolean removeListener(PropertyKey key, Object listener)
+  {
+    logger.info("Removing listener: " + listener + " on path: " + key.getPath() 
+    		+ " from cluster: " + _clusterName + " by instance: " + _instanceName);
+
+    synchronized (this)
+    {
+      List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
+      for (CallbackHandler handler : _handlers)
+      {
+        // compare property-key path and listener reference
+        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener))
+        {
+          toRemove.add(handler);
+        }
+      }
+      
+      _handlers.removeAll(toRemove);
+      
+      // handler.reset() may modify the handlers list, so do it outside the iteration
+      for (CallbackHandler handler : toRemove) {
+    	  handler.reset();
+      }
+    }
+
+    return true;
+  }
+
   private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType)
   {
     checkConnected();
@@ -494,30 +521,6 @@ public class ZKHelixManager implements HelixManager
     return -1;
   }
 
-  @Override
-  public boolean removeListener(Object listener)
-  {
-	logger.info("remove listener: " + listener + " from cluster: " + _clusterName 
-			+ ", instance: " + _instanceName);
-
-    synchronized (this)
-    {
-      Iterator<CallbackHandler> iterator = _handlers.iterator();
-      while (iterator.hasNext())
-      {
-        CallbackHandler handler = iterator.next();
-        // simply compare reference
-        if (handler.getListener().equals(listener))
-        {
-          handler.reset();
-          iterator.remove();
-        }
-      }
-    }
-
-    return true;
-  }
-
   private void addLiveInstance()
   {
     LiveInstance liveInstance = new LiveInstance(_instanceName);
@@ -686,7 +689,10 @@ public class ZKHelixManager implements HelixManager
     }
     _baseDataAccessor.reset();
 
+    // reset all handlers so they have a chance to unsubscribe zk changes from zkclient
+    // abandon all callback-handlers added in expired session
     resetHandlers();
+    _handlers = new ArrayList<CallbackHandler>();
 
     logger.info("Handling new session, session id:" + _sessionId + ", instance:"
         + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
@@ -730,22 +736,13 @@ public class ZKHelixManager implements HelixManager
                        .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
                                                       defaultParticipantErrorMessageHandlerFactory);
 
-      if (_leaderElectionHandler == null)
-      {
-        final String path =
-            PropertyPathConfig.getPath(PropertyType.CONTROLLER, _clusterName);
-
-        _leaderElectionHandler =
+      // create a new leader-election handler for a new session
+      _leaderElectionHandler =
             createCallBackHandler(new Builder(_clusterName).controller(),
                                   new DistClusterControllerElection(_zkConnectString),
                                   new EventType[] { EventType.NodeChildrenChanged,
                                       EventType.NodeDeleted, EventType.NodeCreated },
                                   ChangeType.CONTROLLER);
-      }
-      else
-      {
-        _leaderElectionHandler.init();
-      }
     }
 
     if (_instanceType == InstanceType.PARTICIPANT
@@ -830,42 +827,38 @@ public class ZKHelixManager implements HelixManager
   {
     synchronized (this)
     {
-      // get a copy of the list and iterate over the copy list
-      // in case handler.reset() will modify the original handler list
-      List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
-      handlers.addAll(_handlers);
-
-      for (CallbackHandler handler : handlers)
-      {
-        handler.reset();
-        logger.info("reset handler: " + handler.getPath() + " by "
-            + Thread.currentThread().getName());
-      }
+    	if (_handlers != null)
+        {
+            // get a copy of the list and iterate over the copy list
+            // in case handler.reset() will modify the original handler list
+            List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+            tmpHandlers.addAll(_handlers);
+
+            for (CallbackHandler handler : tmpHandlers)
+            {
+              handler.reset();
+              logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
+            }
+        }
     }
   }
 
   private void initHandlers()
   {
-    // may add new currentState and message listeners during init()
-    // so make a copy and iterate over the copy
     synchronized (this)
     {
-      List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
-      handlers.addAll(_handlers);
-      for (CallbackHandler handler : handlers)
-      {
-        handler.init();
-      }
-    }
-  }
-
-  private void addListener(CallbackHandler handler)
-  {
-    synchronized (this)
-    {
-      _handlers.add(handler);
-      logger.info("add handler: " + handler.getPath() + " by "
-          + Thread.currentThread().getName());
+    	if (_handlers != null)
+    	{
+    	  // may add new currentState and message listeners during init()
+    	  // so make a copy and iterate over the copy
+    	  List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+    	  tmpHandlers.addAll(_handlers);
+          for (CallbackHandler handler : tmpHandlers)
+          {
+            handler.init();
+            logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
+          }
+    	}
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
index a638bc7..f45ba6c 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/GenericLeaderStandbyModel.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelInfo;
@@ -89,7 +90,31 @@ public class GenericLeaderStandbyModel extends StateModel
   {
     LOG.info("Become STANDBY from LEADER");
     HelixManager manager = context.getManager();
-    manager.removeListener(_particHolder);    
+    if (manager == null)
+    {
+      throw new IllegalArgumentException("Require HelixManager in notification conext");
+    }
+    
+    Builder keyBuilder = new Builder(manager.getClusterName());
+    for (ChangeType notificationType : _notificationTypes)
+    {
+      if (notificationType == ChangeType.LIVE_INSTANCE)
+      {
+        manager.removeListener(keyBuilder.liveInstances(), _particHolder);
+      }
+      else if (notificationType == ChangeType.CONFIG)
+      {
+        manager.removeListener(keyBuilder.instanceConfigs(), _particHolder);
+      }
+      else if (notificationType == ChangeType.EXTERNAL_VIEW)
+      {
+        manager.removeListener(keyBuilder.externalViews(), _particHolder);
+      }
+      else
+      {
+        LOG.error("Unsupport notificationType:" + notificationType.toString());
+      }
+    }
   }
 
   @Transition(to="OFFLINE",from="STANDBY")

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 2603272..66e7fa3 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -384,7 +384,7 @@ public class Mocks {
 		}
 
 		@Override
-		public boolean removeListener(Object listener) {
+		public boolean removeListener(PropertyKey key, Object listener) {
 			// TODO Auto-generated method stub
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 1bced7f..c74d650 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -23,6 +23,7 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.Socket;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -34,6 +35,7 @@ import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.InstanceType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -352,5 +354,4 @@ public class ZkTestHelper
 	  return listenerMapBySession;
   }
   
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 25c139b..cad41d0 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -36,6 +36,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.MessageListener;
 import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
@@ -121,7 +122,7 @@ public class DummyClusterManager implements HelixManager
   }
 
   @Override
-  public boolean removeListener(Object listener)
+  public boolean removeListener(PropertyKey key, Object listener)
   {
     // TODO Auto-generated method stub
     return false;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
new file mode 100644
index 0000000..2ff2b1e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -0,0 +1,197 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkHelixTestManager;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
+	
+	@Test
+	public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception
+	{
+	    // Logger.getRootLogger().setLevel(Level.INFO);
+	    String className = TestHelper.getTestClassName();
+	    String methodName = TestHelper.getTestMethodName();
+	    String clusterName = className + "_" + methodName;
+	    int n = 2;
+
+	    System.out.println("START " + clusterName + " at "
+	        + new Date(System.currentTimeMillis()));
+
+	    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+	                            "localhost", // participant name prefix
+	                            "TestDB", // resource name prefix
+	                            1, // resources
+	                            32, // partitions per resource
+	                            n, // number of nodes
+	                            2, // replicas
+	                            "MasterSlave",
+	                            true); // do rebalance
+	    
+	    
+	    ClusterController controller =
+	        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+	    controller.syncStart();
+
+	    // start participants
+	    MockParticipant[] participants = new MockParticipant[n];
+	    for (int i = 0; i < n; i++)
+	    {
+	      String instanceName = "localhost_" + (12918 + i);
+
+	      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+	      participants[i].syncStart();
+	    }
+
+	    boolean result =
+	        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+	                                                                                 clusterName));
+	    Assert.assertTrue(result);
+	    ZkHelixTestManager controllerManager = controller.getManager();
+	    ZkHelixTestManager participantManagerToExpire = (ZkHelixTestManager)participants[1].getManager();
+
+	    // printHandlers(controllerManager);
+	    // printHandlers(participantManagerToExpire);
+	    int controllerHandlerNb = controllerManager.getHandlers().size();
+	    int particHandlerNb = participantManagerToExpire.getHandlers().size();
+	    Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+	    Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+	    
+	    // expire the session of participant
+	    System.out.println("Expiring participant session...");
+	    String oldSessionId = participantManagerToExpire.getSessionId();
+	    
+	    ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
+	    String newSessionId = participantManagerToExpire.getSessionId();
+	    System.out.println("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+
+	    result =
+	        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+	                                                                                                   clusterName));
+	    Assert.assertTrue(result);
+	    // printHandlers(controllerManager);
+	    // printHandlers(participantManagerToExpire);
+	    int handlerNb = controllerManager.getHandlers().size();
+	    Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry");
+	    handlerNb = participantManagerToExpire.getHandlers().size();
+	    Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry");
+
+	    System.out.println("END " + clusterName + " at "
+	            + new Date(System.currentTimeMillis()));
+	}
+	
+	@Test
+	public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception
+	{
+	    // Logger.getRootLogger().setLevel(Level.INFO);
+	    String className = TestHelper.getTestClassName();
+	    String methodName = TestHelper.getTestMethodName();
+	    String clusterName = className + "_" + methodName;
+	    int n = 2;
+
+	    System.out.println("START " + clusterName + " at "
+	        + new Date(System.currentTimeMillis()));
+
+	    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+	                            "localhost", // participant name prefix
+	                            "TestDB", // resource name prefix
+	                            1, // resources
+	                            32, // partitions per resource
+	                            n, // number of nodes
+	                            2, // replicas
+	                            "MasterSlave",
+	                            true); // do rebalance
+	    
+	    
+	    ClusterController controller =
+	        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+	    controller.syncStart();
+
+	    // start participants
+	    MockParticipant[] participants = new MockParticipant[n];
+	    for (int i = 0; i < n; i++)
+	    {
+	      String instanceName = "localhost_" + (12918 + i);
+
+	      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+	      participants[i].syncStart();
+	    }
+
+	    boolean result =
+	        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+	                                                                                 clusterName));
+	    Assert.assertTrue(result);
+	    ZkHelixTestManager controllerManager = controller.getManager();
+	    ZkHelixTestManager participantManager = participants[0].getManager();
+
+	    // printHandlers(controllerManager);
+	    int controllerHandlerNb = controllerManager.getHandlers().size();
+	    int particHandlerNb = participantManager.getHandlers().size();
+	    Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+	    Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+
+
+	    // expire controller
+	    System.out.println("Expiring controller session...");
+	    String oldSessionId = controllerManager.getSessionId();
+	    
+	    ZkTestHelper.expireSession(controllerManager.getZkClient());
+	    String newSessionId = controllerManager.getSessionId();
+	    System.out.println("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+	    
+	    result =
+	        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+	                                                                                                   clusterName));
+	    Assert.assertTrue(result);
+	    // printHandlers(controllerManager);
+	    int handlerNb = controllerManager.getHandlers().size();
+	    Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry");
+	    handlerNb = participantManager.getHandlers().size();
+	    Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry");
+
+
+	    System.out.println("END " + clusterName + " at "
+	            + new Date(System.currentTimeMillis()));
+	}
+
+	static void printHandlers(ZkHelixTestManager manager) 
+	{
+	    List<CallbackHandler> handlers = manager.getHandlers();
+    	System.out.println("\n" + manager.getInstanceName() + " cb-handler#: " + handlers.size());
+    	
+	    for (int i = 0; i < handlers.size(); i++) {
+	    	CallbackHandler handler = handlers.get(i);
+	    	String path = handler.getPath();
+	    	System.out.println(path.substring(manager.getClusterName().length() + 1) + ": " + handler.getListener());
+	    }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index a2f0fcd..732e68a 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -64,15 +64,18 @@ public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertySe
     boolean result = ClusterStateVerifier.verifyByPolling(
         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME), 50 * 1000);
     Assert.assertTrue(result);
-    // When a new live instance is created, we still add current state listener to it thus number should increase by 2
-    for(int j = 0; j < 10; j++)
+
+    // When a new live instance is created, we add current state listener to it
+    // and we will remove current-state listener on expired session
+    // so the number of callback handlers is unchanged
+    for (int j = 0; j < 10; j++)
     {
-      if(controller.getHandlers().size() == (handlerSize + 2))
+      if(controller.getHandlers().size() == (handlerSize))
       {
         break;
       }
       Thread.sleep(400);
     }
-    Assert.assertEquals( controller.getHandlers().size(), handlerSize + 2);
+    Assert.assertEquals( controller.getHandlers().size(), handlerSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 093499d..c65ce24 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -29,6 +29,7 @@ import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
@@ -88,9 +89,10 @@ public class TestZkClusterManager extends ZkUnitTestBase
       // OK
     }
 
+    Builder keyBuilder = new Builder(controller.getClusterName());
     controller.addControllerListener(listener);
     AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
-    controller.removeListener(listener);
+    controller.removeListener(keyBuilder.controller(), listener);
 
     ZkHelixPropertyStore<ZNRecord> store = controller.getHelixPropertyStore();
     ZNRecord record = new ZNRecord("node_1");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
index 511623f..199c543 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
@@ -41,7 +42,7 @@ public class ClusterController extends Thread
   private final String         _controllerMode;
   private final String         _zkAddr;
 
-  private HelixManager   _manager;
+  private ZkHelixTestManager   _manager;
 
   public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
   {
@@ -58,20 +59,11 @@ public class ClusterController extends Thread
 
     if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
     {
-      _manager =
-          HelixManagerFactory.getZKHelixManager(clusterName,
-                                                controllerName,
-                                                InstanceType.CONTROLLER,
-                                                zkAddr);
+      _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
     }
     else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
     {
-      _manager =
-          HelixManagerFactory.getZKHelixManager(clusterName,
-                                                controllerName,
-                                                InstanceType.CONTROLLER_PARTICIPANT,
-                                                zkAddr);
-
+      _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
     }
     else
     {
@@ -80,7 +72,7 @@ public class ClusterController extends Thread
     }
   }
 
-  public HelixManager getManager()
+  public ZkHelixTestManager getManager()
   {
     return _manager;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
index b3fdeb2..8b389fe 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
@@ -505,7 +505,7 @@ public class MockParticipant extends Thread
     }
   }
 
-  public HelixManager getManager()
+  public ZkHelixTestManager getManager()
   {
     return _manager;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/03423e62/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 517dbdf..529b534 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -38,6 +38,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.MessageListener;
 import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
@@ -132,7 +133,7 @@ public class MockZKHelixManager implements HelixManager
   }
 
   @Override
-  public boolean removeListener(Object listener)
+  public boolean removeListener(PropertyKey key, Object listener)
   {
     // TODO Auto-generated method stub
     return false;