You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/18 21:28:35 UTC

git commit: [HELIX-499] Controller should listen for all config changes

Repository: helix
Updated Branches:
  refs/heads/master 325fe8b14 -> a298f23de


[HELIX-499] Controller should listen for all config changes


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a298f23d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a298f23d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a298f23d

Branch: refs/heads/master
Commit: a298f23defb420aeee1a8d10b7f626433df97576
Parents: 325fe8b
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Aug 18 12:24:34 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Aug 18 12:24:34 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  16 +-
 .../helix/api/accessor/ClusterAccessor.java     |   3 +-
 .../controller/GenericHelixController.java      |  54 ++-
 .../helix/controller/HelixControllerMain.java   |   3 +
 .../controller/stages/ClusterDataCache.java     |  24 +-
 .../manager/zk/ControllerManagerHelper.java     |   5 +
 .../helix/manager/zk/ZkCallbackHandler.java     |   4 +
 .../helix/manager/zk/ZkHelixConnection.java     |   9 +-
 .../helix/manager/zk/ZkHelixController.java     |   6 +
 .../org/apache/helix/task/TaskRebalancer.java   |  13 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  33 ++
 .../integration/TestZkCallbackHandlerLeak.java  |  35 +-
 .../manager/TestZkCallbackHandlerLeak.java      | 474 -------------------
 13 files changed, 177 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 421ff60..2b59b27 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -34,6 +34,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.StateModelDefinition;
@@ -79,6 +80,8 @@ public class Cluster {
 
   private final ClusterConfig _config;
 
+  private final ClusterDataCache _cache;
+
   /**
    * construct a cluster
    * @param id
@@ -98,7 +101,7 @@ public class Cluster {
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
       Map<StateModelDefId, StateModelDefinition> stateModelMap,
       Map<ContextId, ControllerContext> contextMap, UserConfig userConfig, boolean isPaused,
-      boolean autoJoinAllowed) {
+      boolean autoJoinAllowed, ClusterDataCache cache) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -140,6 +143,8 @@ public class Cluster {
 
     _contextMap = ImmutableMap.copyOf(contextMap);
 
+    _cache = cache;
+
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);
     _spectatorMap = Collections.emptyMap();
@@ -286,4 +291,13 @@ public class Cluster {
   public ClusterConfig getConfig() {
     return _config;
   }
+
+  /**
+   * Get a ClusterDataCache object which is a flattened version of the physical properties read to
+   * build this object.
+   * @return ClusterDataCache
+   */
+  public ClusterDataCache getCache() {
+    return _cache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ddf809a..eac2bf8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -244,7 +244,8 @@ public class ClusterAccessor {
 
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed,
+        _cache);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index f1c2583..d36b6f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -34,6 +34,7 @@ import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.LiveInstanceChangeListener;
@@ -41,6 +42,7 @@ import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -62,16 +64,21 @@ import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.ResourceValidationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
+
 /**
  * Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
  * It does this by listening to changes in cluster state and scheduling new tasks to get cluster
@@ -87,7 +94,8 @@ import org.apache.log4j.Logger;
  */
 public class GenericHelixController implements IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener {
+    ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener,
+    ScopedConfigChangeListener {
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
   volatile boolean init = false;
   private final PipelineRegistry _registry;
@@ -213,6 +221,8 @@ public class GenericHelixController implements IdealStateChangeListener,
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
       registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
       registry.register("configChange", dataRefresh, rebalancePipeline);
+      registry.register("instanceConfigChange", dataRefresh, rebalancePipeline);
+      registry.register("resourceConfigChange", dataRefresh, rebalancePipeline);
       registry.register("liveInstanceChange", dataRefresh, rebalancePipeline, externalViewPipeline);
 
       registry.register("messageChange", dataRefresh, rebalancePipeline);
@@ -443,7 +453,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     }
     _cache.setInstanceConfigs(configs);
 
-    ClusterEvent event = new ClusterEvent("configChange");
+    ClusterEvent event = new ClusterEvent("instanceConfigChange");
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("eventData", configs);
@@ -452,6 +462,46 @@ public class GenericHelixController implements IdealStateChangeListener,
   }
 
   @Override
+  public void onConfigChange(List<HelixProperty> configs, NotificationContext context) {
+    logger.info("START: GenericClusterController.onConfigChange()");
+    if (context == null || context.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (configs == null) {
+      configs = Collections.emptyList();
+    }
+
+    String eventName;
+    String path = context.getPathChanged();
+    if (path.contains(ConfigScopeProperty.RESOURCE.toString())) {
+      List<ResourceConfiguration> resourceConfigs = Lists.newArrayList();
+      for (HelixProperty property : configs) {
+        resourceConfigs.add(new ResourceConfiguration(property.getRecord()));
+      }
+      _cache.setResourceConfigs(resourceConfigs);
+      eventName = "resourceConfigChange";
+    } else if (path.contains(ConfigScopeProperty.CONSTRAINT.toString())) {
+      List<ClusterConstraints> constraints = Lists.newArrayList();
+      for (HelixProperty property : configs) {
+        constraints.add(new ClusterConstraints(property.getRecord()));
+      }
+      _cache.setConstraints(constraints);
+      eventName = "constraintChange";
+    } else {
+      logger.warn("Controller received event for unsupported path: " + path);
+      eventName = "configChange";
+    }
+
+    ClusterEvent event = new ClusterEvent(eventName);
+    event.addAttribute("changeContext", context);
+    event.addAttribute("helixmanager", context.getManager());
+    event.addAttribute("eventData", configs);
+    _eventQueue.put(event);
+    logger.info("END: GenericClusterController.onConfigChange()");
+  }
+
+  @Override
   public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange()");
     _cache.requireFullRefresh();

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index ca540c5..b652f35 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -49,6 +49,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.HelixManagerShutdownHook;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.log4j.Logger;
@@ -134,6 +135,8 @@ public class HelixControllerMain {
       GenericHelixController controller) {
     try {
       manager.addInstanceConfigChangeListener(controller);
+      manager.addConfigChangeListener(controller, ConfigScopeProperty.RESOURCE);
+      manager.addConfigChangeListener(controller, ConfigScopeProperty.CONSTRAINT);
       manager.addLiveInstanceChangeListener(controller);
       manager.addIdealStateChangeListener(controller);
       // no need for controller to listen on external-view

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 877baf2..53ddd19 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -64,10 +64,12 @@ public class ClusterDataCache {
   Map<String, InstanceConfig> _instanceConfigMap;
   Map<String, InstanceConfig> _instanceConfigCacheMap;
   Map<String, ClusterConstraints> _constraintMap;
+  Map<String, ClusterConstraints> _constraintCacheMap;
   Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
   Map<String, Map<String, Message>> _messageMap;
   Map<String, Map<String, String>> _idealStateRuleMap;
   Map<String, ResourceConfiguration> _resourceConfigMap;
+  Map<String, ResourceConfiguration> _resourceConfigCacheMap;
   Map<String, ControllerContextHolder> _controllerContextMap;
   PauseSignal _pause;
   LiveInstance _leader;
@@ -97,17 +99,20 @@ public class ClusterDataCache {
       _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+      _resourceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+      _constraintCacheMap = accessor.getChildValuesMap(keyBuilder.constraints());
     }
     _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
     _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
     _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
+    _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+    _constraintMap = Maps.newHashMap(_constraintCacheMap);
 
     for (LiveInstance instance : _liveInstanceMap.values()) {
       LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
     }
 
     _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-    _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
 
     Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
     List<PropertyKey> newMessageKeys = Lists.newLinkedList();
@@ -200,7 +205,6 @@ public class ClusterDataCache {
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
     // New in 0.7: Read more information for the benefit of user-defined rebalancers
-    _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
     _controllerContextMap = accessor.getChildValuesMap(keyBuilder.controllerContexts());
 
     // Read all single properties together
@@ -370,6 +374,22 @@ public class ClusterDataCache {
     _instanceConfigCacheMap = instanceConfigMap;
   }
 
+  public synchronized void setResourceConfigs(List<ResourceConfiguration> resourceConfigs) {
+    Map<String, ResourceConfiguration> resourceConfigMap = Maps.newHashMap();
+    for (ResourceConfiguration resourceConfig : resourceConfigs) {
+      resourceConfigMap.put(resourceConfig.getId(), resourceConfig);
+    }
+    _resourceConfigCacheMap = resourceConfigMap;
+  }
+
+  public synchronized void setConstraints(List<ClusterConstraints> constraints) {
+    Map<String, ClusterConstraints> constraintMap = Maps.newHashMap();
+    for (ClusterConstraints constraint : constraints) {
+      constraintMap.put(constraint.getId(), constraint);
+    }
+    _constraintCacheMap = constraintMap;
+  }
+
   /**
    * Some partitions might be disabled on specific nodes.
    * This method allows one to fetch the set of nodes where a given partition is disabled

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index 623b874..e2def60 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -28,6 +28,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.log4j.Logger;
 
 /**
@@ -73,6 +74,8 @@ public class ControllerManagerHelper {
        * setup generic-controller
        */
       _manager.addInstanceConfigChangeListener(controller);
+      _manager.addConfigChangeListener(controller, ConfigScopeProperty.RESOURCE);
+      _manager.addConfigChangeListener(controller, ConfigScopeProperty.CONSTRAINT);
       _manager.addLiveInstanceChangeListener(controller);
       _manager.addIdealStateChangeListener(controller);
       // no need for controller to listen on external-view
@@ -92,6 +95,8 @@ public class ControllerManagerHelper {
      * reset generic-controller
      */
     _manager.removeListener(keyBuilder.instanceConfigs(), controller);
+    _manager.removeListener(keyBuilder.resourceConfigs(), controller);
+    _manager.removeListener(keyBuilder.constraints(), controller);
     _manager.removeListener(keyBuilder.liveInstances(), controller);
     _manager.removeListener(keyBuilder.idealStates(), controller);
     _manager.removeListener(keyBuilder.controller(), controller);

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
index ee420b9..b953e0b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
@@ -332,6 +332,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.INIT);
+      changeContext.setPathChanged(_path);
       invoke(changeContext);
     } catch (Exception e) {
       String msg = "Exception while invoking init callback for listener:" + _listener;
@@ -346,6 +347,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
       if (dataPath != null && dataPath.startsWith(_path)) {
         NotificationContext changeContext = new NotificationContext(_manager);
         changeContext.setType(NotificationContext.Type.CALLBACK);
+        changeContext.setPathChanged(_path);
         invoke(changeContext);
       }
     } catch (Exception e) {
@@ -399,6 +401,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
         } else {
           changeContext.setType(NotificationContext.Type.CALLBACK);
         }
+        changeContext.setPathChanged(_path);
         invoke(changeContext);
       }
     } catch (Exception e) {
@@ -416,6 +419,7 @@ public class ZkCallbackHandler implements IZkChildListener, IZkDataListener {
     try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);
+      changeContext.setPathChanged(_path);
       invoke(changeContext);
     } catch (Exception e) {
       String msg = "Exception while resetting the listener:" + _listener;

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index 0a9dc94..38332c5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -37,7 +37,6 @@ import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixMultiClusterController;
 import org.apache.helix.HelixConnection;
 import org.apache.helix.HelixConnectionStateListener;
 import org.apache.helix.HelixConstants.ChangeType;
@@ -45,6 +44,7 @@ import org.apache.helix.HelixController;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixMultiClusterController;
 import org.apache.helix.HelixParticipant;
 import org.apache.helix.HelixRole;
 import org.apache.helix.IdealStateChangeListener;
@@ -210,7 +210,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
   }
 
   @Override
-  public HelixMultiClusterController createAutoController(ClusterId clusterId, ControllerId controllerId) {
+  public HelixMultiClusterController createAutoController(ClusterId clusterId,
+      ControllerId controllerId) {
     return new ZkHelixMultiClusterController(this, clusterId, controllerId);
   }
 
@@ -327,6 +328,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
     case RESOURCE:
       propertyKey = keyBuilder.resourceConfigs();
       break;
+    case CONSTRAINT:
+      propertyKey = keyBuilder.constraints();
     default:
       break;
     }
@@ -502,7 +505,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
   @Override
   public ClusterMessagingService createMessagingService(HelixRole role) {
     HelixManager manager = new ZKHelixManager(role);
-     return new DefaultMessagingService(manager);
+    return new DefaultMessagingService(manager);
   }
 
   void addListener(HelixRole role, Object listener, PropertyKey propertyKey, ChangeType changeType,

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 0698945..295b69c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.id.Id;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.StatusDumpTask;
 import org.apache.log4j.Logger;
@@ -226,6 +227,9 @@ public class ZkHelixController implements HelixController {
        * setup generic-controller
        */
       _connection.addInstanceConfigChangeListener(this, pipeline, _clusterId);
+      _connection.addConfigChangeListener(this, pipeline, _clusterId, ConfigScopeProperty.RESOURCE);
+      _connection.addConfigChangeListener(this, pipeline, _clusterId,
+          ConfigScopeProperty.CONSTRAINT);
       _connection.addLiveInstanceChangeListener(this, pipeline, _clusterId);
       _connection.addIdealStateChangeListener(this, pipeline, _clusterId);
       _connection.addControllerListener(this, pipeline, _clusterId);
@@ -242,6 +246,8 @@ public class ZkHelixController implements HelixController {
      * reset generic-controller
      */
     _connection.removeListener(this, pipeline, keyBuilder.instanceConfigs());
+    _connection.removeListener(this, pipeline, keyBuilder.resourceConfigs());
+    _connection.removeListener(this, pipeline, keyBuilder.constraints());
     _connection.removeListener(this, pipeline, keyBuilder.liveInstances());
     _connection.removeListener(this, pipeline, keyBuilder.idealStates());
     _connection.removeListener(this, pipeline, keyBuilder.controller());

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 8f861cd..af89944 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -52,6 +52,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
@@ -124,11 +125,19 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     final String resourceName = resource.getId().toString();
 
     // Fetch job configuration
-    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+    Map<String, ResourceConfiguration> resourceConfigs =
+        clusterData.getCache().getResourceConfigs();
+    JobConfig jobCfg = TaskUtil.getJobCfg(resourceConfigs.get(resourceName));
+    if (jobCfg == null) {
+      return emptyAssignment(resourceName, currStateOutput);
+    }
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(resourceConfigs.get(workflowResource));
+    if (workflowCfg == null) {
+      return emptyAssignment(resourceName, currStateOutput);
+    }
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
 
     // Initialize workflow context if needed

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 17b610d..cca7d76 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -68,6 +68,16 @@ public class TaskUtil {
    */
   public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
     HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+    return getJobCfg(jobResourceConfig);
+  }
+
+  /**
+   * Parses job resource configurations directly from a property into a {@link JobConfig}.
+   * @param jobResourceConfig the property containing the configuration
+   * @return A {@link JobConfig} object if the property valid configurations for the job, null
+   *         otherwise.
+   */
+  public static JobConfig getJobCfg(HelixProperty jobResourceConfig) {
     if (jobResourceConfig == null) {
       return null;
     }
@@ -93,6 +103,29 @@ public class TaskUtil {
    */
   public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
     Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+    return getWorkflowCfg(workflowCfg);
+  }
+
+  /**
+   * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+   * @param workflowResourceConfig the proeprty containing the configurations
+   * @return A {@link WorkflowConfig} object if the property contains valid configurations for the
+   *         workflow, null otherwise.
+   */
+  public static WorkflowConfig getWorkflowCfg(HelixProperty workflowResourceConfig) {
+    if (workflowResourceConfig == null) {
+      return null;
+    }
+    return getWorkflowCfg(workflowResourceConfig.getRecord().getSimpleFields());
+  }
+
+  /**
+   * Parses a key-value map into a {@link WorkflowConfig} object.
+   * @param workflowCfg the map of configurations
+   * @return A {@link WorkflowConfig} object if the map contains valid configurations for the
+   *         workflow, null otherwise.
+   */
+  private static WorkflowConfig getWorkflowCfg(Map<String, String> workflowCfg) {
     if (workflowCfg == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 9e8fd85..190f739 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -31,8 +31,8 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -60,8 +60,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    final MockController controller =
-        new MockController(_zkaddr, clusterName, "controller_0");
+    final MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
@@ -92,10 +91,10 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (6 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -117,8 +116,8 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     TestHelper.printHandlers(participantManagerToExpire, participantManagerToExpire.getHandlers());
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 10,
-        "HelixController should have 10 (6+2n) callback handlers for 2 (n) participant");
+    Assert.assertEquals(controllerHandlerNb, 12,
+        "HelixController should have 10 (8+2n) callback handlers for 2 (n) participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
@@ -147,10 +146,10 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (6 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -205,8 +204,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    final MockController controller =
-        new MockController(_zkaddr, clusterName, "controller_0");
+    final MockController controller = new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
@@ -244,8 +242,8 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
     TestHelper.printHandlers(controller, controller.getHandlers());
-    Assert.assertEquals(controllerHandlerNb, 10,
-        "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+    Assert.assertEquals(controllerHandlerNb, 12,
+        "HelixController should have 12 (6+2n) callback handlers for 2 participant, but was "
             + controllerHandlerNb);
     TestHelper.printHandlers(participantManager, participantManager.getHandlers());
     Assert.assertEquals(particHandlerNb, 1,
@@ -277,10 +275,10 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (6 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -301,8 +299,11 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     // printHandlers(controllerManager);
     int handlerNb = controller.getHandlers().size();
     TestHelper.printHandlers(controller, controller.getHandlers());
-    Assert.assertEquals(handlerNb, controllerHandlerNb,
-        "controller callback handlers should not increase after participant session expiry");
+    Assert
+        .assertTrue(
+            handlerNb <= controllerHandlerNb,
+            "controller callback handlers should not increase after participant session expiry (expected no more than "
+                + controllerHandlerNb + ", found " + handlerNb + ")");
     handlerNb = participantManager.getHandlers().size();
     TestHelper.printHandlers(participantManager, participantManager.getHandlers());
     Assert.assertEquals(handlerNb, particHandlerNb,

http://git-wip-us.apache.org/repos/asf/helix/blob/a298f23d/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
deleted file mode 100644
index 1393231..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ /dev/null
@@ -1,474 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.MockParticipant;
-import org.apache.helix.manager.zk.MockController;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZkCallbackHandlerLeak extends ZkTestBase {
-  private static Logger LOG = Logger.getLogger(TestZkCallbackHandlerLeak.class);
-
-  @Test
-  public void testCbHdlrLeakOnParticipantSessionExpiry() throws Exception {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    final int n = 2;
-
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestDB", // resource name prefix
-        1, // resources
-        32, // partitions per resource
-        n, // number of nodes
-        2, // replicas
-        "MasterSlave", true); // do rebalance
-
-    // start controller
-    final MockController controller = new MockController(_zkaddr, clusterName, "controller");
-    controller.connect();
-
-    // start participants
-    MockParticipant[] participants = new MockParticipant[n];
-    for (int i = 0; i < n; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // check controller zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
-        LOG.debug("all watchers: " + watchers);
-        Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
-        LOG.debug("controller watch paths: " + watchPaths);
-
-        // controller should have 5 + 2n + m + (m+2)n zk-watchers
-        // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (6 + 5 * n);
-      }
-    }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
-
-    // check participant zk-watchers
-    final MockParticipant participantManagerToExpire = participants[0];
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
-        Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
-        LOG.debug("participant watch paths: " + watchPaths);
-
-        // participant should have 1 zk-watcher: 1 for MESSAGE
-        return watchPaths.size() == 1;
-      }
-    }, 500);
-    Assert.assertTrue(result, "Participant should have 1 zk-watcher.");
-
-    // check HelixManager#_handlers
-    // printHandlers(controllerManager);
-    // printHandlers(participantManagerToExpire);
-    int controllerHandlerNb = controller.getHandlers().size();
-    int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
-        "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant");
-    Assert.assertEquals(particHandlerNb, 1,
-        "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
-
-    // expire the session of participant
-    LOG.debug("Expiring participant session...");
-    String oldSessionId = participantManagerToExpire.getSessionId();
-
-    ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
-    String newSessionId = participantManagerToExpire.getSessionId();
-    LOG.debug("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: "
-        + newSessionId);
-
-    result =
-        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
-            _zkaddr, clusterName));
-    Assert.assertTrue(result);
-
-    // check controller zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
-        Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
-        LOG.debug("controller watch paths after session expiry: " + watchPaths);
-
-        // controller should have 5 + 2n + m + (m+2)n zk-watchers
-        // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (6 + 5 * n);
-      }
-    }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
-
-    // check participant zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
-        Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
-        LOG.debug("participant watch paths after session expiry: " + watchPaths);
-
-        // participant should have 1 zk-watcher: 1 for MESSAGE
-        return watchPaths.size() == 1;
-      }
-    }, 500);
-    Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
-
-    // check handlers
-    // printHandlers(controllerManager);
-    // printHandlers(participantManagerToExpire);
-    int handlerNb = controller.getHandlers().size();
-    Assert.assertEquals(handlerNb, controllerHandlerNb,
-        "controller callback handlers should not increase after participant session expiry");
-    handlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(handlerNb, particHandlerNb,
-        "participant callback handlers should not increase after participant session expiry");
-
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Test
-  public void testCbHdlrLeakOnControllerSessionExpiry() throws Exception {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    final int n = 2;
-
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestDB", // resource name prefix
-        1, // resources
-        32, // partitions per resource
-        n, // number of nodes
-        2, // replicas
-        "MasterSlave", true); // do rebalance
-
-    final MockController controller = new MockController(_zkaddr, clusterName, "controller");
-    controller.syncStart();
-
-    // start participants
-    MockParticipant[] participants = new MockParticipant[n];
-    for (int i = 0; i < n; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
-      participants[i].syncStart();
-    }
-
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // wait until we get all the listeners registered
-    final MockParticipant participantManager = participants[0];
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        int controllerHandlerNb = controller.getHandlers().size();
-        int particHandlerNb = participantManager.getHandlers().size();
-        if (controllerHandlerNb == 10 && particHandlerNb == 2)
-          return true;
-        else
-          return false;
-      }
-    }, 1000);
-
-    int controllerHandlerNb = controller.getHandlers().size();
-    int particHandlerNb = participantManager.getHandlers().size();
-    TestHelper.printHandlers(controller, controller.getHandlers());
-    TestHelper.printHandlers(participantManager, participantManager.getHandlers());
-    Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
-        "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
-            + controllerHandlerNb);
-    Assert.assertEquals(particHandlerNb, 1,
-        "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
-            + particHandlerNb);
-
-    // expire controller
-    LOG.debug("Expiring controller session...");
-    String oldSessionId = controller.getSessionId();
-
-    ZkTestHelper.expireSession(controller.getZkClient());
-    String newSessionId = controller.getSessionId();
-    LOG.debug("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: "
-        + newSessionId);
-
-    result =
-        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
-            _zkaddr, clusterName));
-    Assert.assertTrue(result);
-
-    // check controller zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
-        Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
-        // System.out.println("controller watch paths after session expiry: " +
-        // watchPaths);
-
-        // controller should have 5 + 2n + m + (m+2)n zk-watchers
-        // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (6 + 5 * n);
-      }
-    }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
-
-    // check participant zk-watchers
-    result = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(_zkaddr);
-        Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
-        LOG.debug("participant watch paths after session expiry: " + watchPaths);
-
-        // participant should have 1 zk-watcher: 1 for MESSAGE
-        return watchPaths.size() == 1;
-      }
-    }, 500);
-    Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
-
-    // check HelixManager#_handlers
-    // printHandlers(controllerManager);
-    int handlerNb = controller.getHandlers().size();
-    TestHelper.printHandlers(controller, controller.getHandlers());
-    Assert.assertEquals(handlerNb, controllerHandlerNb,
-        "controller callback handlers should not increase after participant session expiry");
-    handlerNb = participantManager.getHandlers().size();
-    TestHelper.printHandlers(participantManager, participantManager.getHandlers());
-    Assert.assertEquals(handlerNb, particHandlerNb,
-        "participant callback handlers should not increase after participant session expiry");
-
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-
-  @Test
-  public void testRemoveUserCbHdlrOnPathRemoval() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    final int n = 3;
-    final String zkAddr = _zkaddr;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 1, // resource
-        32, // partitions
-        n, // nodes
-        2, // replicas
-        "MasterSlave", true);
-
-    final MockController controller = new MockController(zkAddr, clusterName, "controller");
-    controller.syncStart();
-
-    MockParticipant[] participants = new MockParticipant[n];
-    for (int i = 0; i < n; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipant(zkAddr, clusterName, instanceName);
-      participants[i].syncStart();
-
-      // register a controller listener on participant_0
-      if (i == 0) {
-        MockParticipant manager = participants[0];
-        manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
-          @Override
-          public void onStateChange(String instanceName, List<CurrentState> statesInfo,
-              NotificationContext changeContext) {
-            // To change body of implemented methods use File | Settings | File Templates.
-            // System.out.println(instanceName + " on current-state change, type: " +
-            // changeContext.getType());
-          }
-        }, manager.getInstanceName(), manager.getSessionId());
-      }
-    }
-
-    Boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
-                clusterName));
-    Assert.assertTrue(result);
-
-    MockParticipant participantToExpire = participants[0];
-    String oldSessionId = participantToExpire.getSessionId();
-    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
-
-    // check manager#hanlders
-    Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
-        "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
-
-    // check zkclient#listeners
-    Map<String, Set<IZkDataListener>> dataListeners =
-        ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
-    Map<String, Set<IZkChildListener>> childListeners =
-        ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
-    // printZkListeners(participantToExpire.getZkClient());
-    Assert.assertEquals(dataListeners.size(), 1,
-        "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
-    String path =
-        keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0")
-            .getPath();
-    Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
-        + path);
-    Assert
-        .assertEquals(childListeners.size(), 2,
-            "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
-    path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
-    path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
-    path = keyBuilder.controller().getPath();
-    Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
-
-    // check zookeeper#watches on client side
-    Map<String, List<String>> watchPaths =
-        ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-    LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
-    Assert
-        .assertEquals(watchPaths.get("dataWatches").size(), 3,
-            "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
-        "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
-
-    // expire localhost_12918
-    System.out.println("Expire participant: " + participantToExpire.getInstanceName()
-        + ", session: " + participantToExpire.getSessionId());
-    ZkTestHelper.expireSession(participantToExpire.getZkClient());
-    String newSessionId = participantToExpire.getSessionId();
-    System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId
-        + ", newSessionId: " + newSessionId);
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // check manager#hanlders
-    Assert
-        .assertEquals(
-            participantToExpire.getHandlers().size(),
-            1,
-            "Should have 1 handler: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
-
-    // check zkclient#listeners
-    dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
-    childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
-    // printZkListeners(participantToExpire.getZkClient());
-    Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners");
-    Assert
-        .assertEquals(
-            childListeners.size(),
-            2,
-            "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
-                + "MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
-    path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 0,
-        "Should have no child-listener on path: " + path);
-    path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
-    path = keyBuilder.controller().getPath();
-    Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
-
-    // check zookeeper#watches on client side
-    watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-    LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
-    Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
-        "Should have 1 data-watch: MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
-        "Should have 1 child-watch: MESSAGES");
-    Assert
-        .assertEquals(watchPaths.get("existWatches").size(), 2,
-            "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
-
-    // another session expiry on localhost_12918 should clear the two exist-watches on
-    // CURRENTSTATE/{oldSessionId}
-    System.out.println("Expire participant: " + participantToExpire.getInstanceName()
-        + ", session: " + participantToExpire.getSessionId());
-    ZkTestHelper.expireSession(participantToExpire.getZkClient());
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
-                clusterName));
-    Assert.assertTrue(result);
-
-    // check zookeeper#watches on client side
-    watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-    LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
-    Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
-        "Should have 1 data-watch: MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
-        "Should have 1 child-watch: MESSAGES");
-    Assert
-        .assertEquals(
-            watchPaths.get("existWatches").size(),
-            0,
-            "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
-
-    // Thread.sleep(1000);
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-  }
-}