You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:10:01 UTC

[07/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index afd35e6..b71304a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,18 +19,15 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 
+import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
@@ -40,12 +37,12 @@ import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HealthStateChangeListener;
 import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.HelixTimerTask;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.InstanceType;
@@ -59,141 +56,222 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
 import org.apache.helix.healthcheck.HealthStatsAggregator;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.healthcheck.ParticipantHealthReportTask;
 import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
-import org.apache.helix.participant.DistClusterControllerElection;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ZKHelixManager implements HelixManager, IZkStateListener {
+  private static Logger LOG = Logger.getLogger(ZKHelixManager.class);
+
+  public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
+  public static final int MAX_DISCONNECT_THRESHOLD = 5;
+  public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
 
-public class ZKHelixManager implements HelixManager {
-  private static Logger logger = Logger.getLogger(ZKHelixManager.class);
-  private static final int RETRY_LIMIT = 3;
-  private static final int CONNECTIONTIMEOUT = 60 * 1000;
+  protected final String _zkAddress;
   private final String _clusterName;
   private final String _instanceName;
-  private final String _zkConnectString;
-  private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
-  private ZKHelixDataAccessor _helixAccessor;
-  private ConfigAccessor _configAccessor;
-  protected ZkClient _zkClient;
-  protected final List<CallbackHandler> _handlers = new ArrayList<CallbackHandler>();
-  private final ZkStateChangeListener _zkStateChangeListener;
   private final InstanceType _instanceType;
-  volatile String _sessionId;
-  private Timer _timer;
-  private CallbackHandler _leaderElectionHandler;
-  private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
-  private ParticipantHealthReportTask _participantHealthReportTask;
-  private final DefaultMessagingService _messagingService;
-  private ZKHelixAdmin _managementTool;
-  private final String _version;
+  private final int _sessionTimeout;
+  private final List<PreConnectCallback> _preConnectCallbacks;
+  protected final List<CallbackHandler> _handlers;
   private final HelixManagerProperties _properties;
-  private final StateMachineEngine _stateMachEngine;
-  private int _sessionTimeout;
-  private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
-  private final List<HelixTimerTask> _controllerTimerTasks;
+
+  /**
+   * helix version#
+   */
+  private final String _version;
+
+  protected ZkClient _zkclient = null;
+  private final DefaultMessagingService _messagingService;
+
   private BaseDataAccessor<ZNRecord> _baseDataAccessor;
-  List<PreConnectCallback> _preConnectCallbacks = new LinkedList<PreConnectCallback>();
-  ZKPropertyTransferServer _transferServer = null;
-  int _flappingTimeWindowMs;
-  int _maxDisconnectThreshold;
-  public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
-  public static final int MAX_DISCONNECT_THRESHOLD = 5;
-  LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
-  public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
+  private ZKHelixDataAccessor _dataAccessor;
+  private final Builder _keyBuilder;
+  private ConfigAccessor _configAccessor;
+  private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
+  protected LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
 
-  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
-      String zkConnectString) {
-    logger.info("Create a zk-based cluster manager. clusterName:" + clusterName + ", instanceName:"
-        + instanceName + ", type:" + instanceType + ", zkSvr:" + zkConnectString);
-    _flappingTimeWindowMs = FLAPPING_TIME_WINDIOW;
-    try {
-      _flappingTimeWindowMs =
-          Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", ""
-              + FLAPPING_TIME_WINDIOW));
-    } catch (NumberFormatException e) {
-      logger.warn("Exception while parsing helixmanager.flappingTimeWindow: "
-          + System.getProperty("helixmanager.flappingTimeWindow", "" + FLAPPING_TIME_WINDIOW));
-    }
-    _maxDisconnectThreshold = MAX_DISCONNECT_THRESHOLD;
-    try {
-      _maxDisconnectThreshold =
-          Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", ""
-              + MAX_DISCONNECT_THRESHOLD));
-    } catch (NumberFormatException e) {
-      logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: "
-          + System
-              .getProperty("helixmanager.maxDisconnectThreshold", "" + MAX_DISCONNECT_THRESHOLD));
-    }
-    int sessionTimeoutInt = -1;
-    try {
-      sessionTimeoutInt =
-          Integer.parseInt(System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
-    } catch (NumberFormatException e) {
-      logger.warn("Exception while parsing session timeout: "
-          + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
+  private volatile String _sessionId;
+
+  /**
+   * Keep track of timestamps that zk State has become Disconnected
+   * If in a _timeWindowLengthMs window zk State has become Disconnected
+   * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
+   */
+  private final List<Long> _disconnectTimeHistory = new ArrayList<Long>();
+  private final int _flappingTimeWindowMs;
+  private final int _maxDisconnectThreshold;
+
+  /**
+   * participant fields
+   */
+  private final StateMachineEngine _stateMachineEngine;
+  private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+  private final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
+
+  /**
+   * controller fields
+   */
+  private final GenericHelixController _controller = new GenericHelixController();
+  private CallbackHandler _leaderElectionHandler = null;
+  protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
+
+  /**
+   * status dump timer-task
+   */
+  static class StatusDumpTask extends HelixTimerTask {
+    Timer _timer = null;
+    final ZkClient zkclient;
+    final HelixManager helixController;
+
+    public StatusDumpTask(ZkClient zkclient, HelixManager helixController) {
+      this.zkclient = zkclient;
+      this.helixController = helixController;
+    }
+
+    @Override
+    public void start() {
+      long initialDelay = 30 * 60 * 1000;
+      long period = 120 * 60 * 1000;
+      int timeThresholdNoChange = 180 * 60 * 1000;
+
+      if (_timer == null) {
+        LOG.info("Start StatusDumpTask");
+        _timer = new Timer("StatusDumpTimerTask", true);
+        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
+            timeThresholdNoChange), initialDelay, period);
+      }
     }
-    if (sessionTimeoutInt > 0) {
-      _sessionTimeout = sessionTimeoutInt;
-    } else {
-      _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+
+    @Override
+    public void stop() {
+      if (_timer != null) {
+        LOG.info("Stop StatusDumpTask");
+        _timer.cancel();
+        _timer = null;
+      }
     }
+  }
+
+  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
+      String zkAddress) {
+
+    LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
+        + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
+
+    _zkAddress = zkAddress;
+    _clusterName = clusterName;
+    _instanceType = instanceType;
+
     if (instanceName == null) {
       try {
         instanceName =
             InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
       } catch (UnknownHostException e) {
         // can ignore it
-        logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+        LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
         instanceName = "UNKNOWN";
       }
     }
 
-    _clusterName = clusterName;
     _instanceName = instanceName;
-    _instanceType = instanceType;
-    _zkConnectString = zkConnectString;
-    _zkStateChangeListener =
-        new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
-    _timer = null;
+    _preConnectCallbacks = new ArrayList<PreConnectCallback>();
+    _handlers = new ArrayList<CallbackHandler>();
+    _properties = new HelixManagerProperties("cluster-manager-version.properties");
+    _version = _properties.getVersion();
 
+    _keyBuilder = new Builder(clusterName);
     _messagingService = new DefaultMessagingService(this);
 
-    _properties = new HelixManagerProperties("cluster-manager-version.properties");
-    _version = _properties.getVersion();
+    /**
+     * use system property if available
+     */
+    _flappingTimeWindowMs =
+        getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
+            ZKHelixManager.FLAPPING_TIME_WINDIOW);
+
+    _maxDisconnectThreshold =
+        getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
+            ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
 
-    _stateMachEngine = new HelixStateMachineEngine(this);
+    _sessionTimeout =
+        getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+
+    /**
+     * instance type specific init
+     */
+    switch (instanceType) {
+    case PARTICIPANT:
+      _stateMachineEngine = new HelixStateMachineEngine(this);
+      _participantHealthInfoCollector =
+          new ParticipantHealthReportCollectorImpl(this, _instanceName);
+
+      _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
+
+      break;
+    case CONTROLLER:
+      _stateMachineEngine = null;
+      _participantHealthInfoCollector = null;
+      _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
+      _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+
+      break;
+    case CONTROLLER_PARTICIPANT:
+      _stateMachineEngine = new HelixStateMachineEngine(this);
+      _participantHealthInfoCollector =
+          new ParticipantHealthReportCollectorImpl(this, _instanceName);
+
+      _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
 
-    // add all timer tasks
-    _controllerTimerTasks = new ArrayList<HelixTimerTask>();
-    if (_instanceType == InstanceType.CONTROLLER) {
       _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
+      _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+
+      break;
+    case ADMINISTRATOR:
+    case SPECTATOR:
+      _stateMachineEngine = null;
+      _participantHealthInfoCollector = null;
+      break;
+    default:
+      throw new IllegalArgumentException("unrecognized type: " + instanceType);
     }
   }
 
+  private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
+    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
+
+    try {
+      int value = Integer.parseInt(valueString);
+      if (value > 0) {
+        return value;
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
+          + ", using default value: " + propertyDefaultValue);
+    }
+
+    return propertyDefaultValue;
+  }
+
   @Override
   public boolean removeListener(PropertyKey key, Object listener) {
-    logger.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
+    LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
         + _clusterName + " by instance: " + _instanceName);
 
     synchronized (this) {
@@ -216,7 +294,13 @@ public class ZKHelixManager implements HelixManager {
     return true;
   }
 
-  private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
+  void checkConnected() {
+    if (!isConnected()) {
+      throw new HelixException("HelixManager is not connected. Call HelixManager#connect()");
+    }
+  }
+
+  void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
       EventType[] eventType) {
     checkConnected();
 
@@ -227,16 +311,18 @@ public class ZKHelixManager implements HelixManager {
         // compare property-key path and listener reference
         if (handler.getPath().equals(propertyKey.getPath())
             && handler.getListener().equals(listener)) {
-          logger.info("Listener: " + listener + " on path: " + propertyKey.getPath()
-              + " already exists. skip adding it");
+          LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
+              + " already exists. skip add");
+
           return;
         }
       }
 
       CallbackHandler newHandler =
-          createCallBackHandler(propertyKey, listener, eventType, changeType);
+          new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType);
+
       _handlers.add(newHandler);
-      logger.info("Add listener: " + listener + " for type: " + type + " to path: "
+      LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
           + newHandler.getPath());
     }
   }
@@ -259,7 +345,7 @@ public class ZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addConfigChangeListener(ConfigChangeListener listener) {
+  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
     addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
         new EventType[] {
           EventType.NodeChildrenChanged
@@ -267,7 +353,8 @@ public class ZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) {
+  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
+      throws Exception {
     addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
         new EventType[] {
           EventType.NodeChildrenChanged
@@ -275,7 +362,8 @@ public class ZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope) {
+  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
+      throws Exception {
     Builder keyBuilder = new Builder(_clusterName);
 
     PropertyKey propertyKey = null;
@@ -298,7 +386,7 @@ public class ZKHelixManager implements HelixManager {
         EventType.NodeChildrenChanged
       });
     } else {
-      logger.error("Can't add listener to config scope: " + scope);
+      LOG.error("Can't add listener to config scope: " + scope);
     }
   }
 
@@ -312,7 +400,8 @@ public class ZKHelixManager implements HelixManager {
         });
   }
 
-  void addControllerMessageListener(MessageListener listener) {
+  @Override
+  public void addControllerMessageListener(MessageListener listener) {
     addListener(listener, new Builder(_clusterName).controllerMessages(),
         ChangeType.MESSAGES_CONTROLLER, new EventType[] {
             EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -321,7 +410,7 @@ public class ZKHelixManager implements HelixManager {
 
   @Override
   public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
-      String instanceName, String sessionId) {
+      String instanceName, String sessionId) throws Exception {
     addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
         ChangeType.CURRENT_STATE, new EventType[] {
             EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -329,7 +418,8 @@ public class ZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName) {
+  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+      throws Exception {
     addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
         new EventType[] {
             EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -337,7 +427,7 @@ public class ZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addExternalViewChangeListener(ExternalViewChangeListener listener) {
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
     addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
         new EventType[] {
             EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
@@ -355,7 +445,7 @@ public class ZKHelixManager implements HelixManager {
   @Override
   public HelixDataAccessor getHelixDataAccessor() {
     checkConnected();
-    return _helixAccessor;
+    return _dataAccessor;
   }
 
   @Override
@@ -374,484 +464,158 @@ public class ZKHelixManager implements HelixManager {
     return _instanceName;
   }
 
-  @Override
-  public void connect() throws Exception {
-    logger.info("ClusterManager.connect()");
-    if (_zkStateChangeListener.isConnected()) {
-      logger.warn("Cluster manager " + _clusterName + " " + _instanceName + " already connected");
-      return;
-    }
-
-    try {
-      createClient(_zkConnectString);
-      _messagingService.onConnected();
-    } catch (Exception e) {
-      logger.error(e);
-      disconnect();
-      throw e;
-    }
-  }
-
-  @Override
-  public void disconnect() {
-    if (!isConnected()) {
-      logger.error("ClusterManager " + _instanceName + " already disconnected");
-      return;
-    }
-    disconnectInternal();
-  }
-
-  void disconnectInternal() {
-    // This function can be called when the connection are in bad state(e.g. flapping),
-    // in which isConnected() could be false and we want to disconnect from cluster.
-    logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
-
-    /**
-     * shutdown thread pool first to avoid reset() being invoked in the middle of state
-     * transition
-     */
-    _messagingService.getExecutor().shutdown();
-    resetHandlers();
-
-    _helixAccessor.shutdown();
-
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-
-    if (_participantHealthCheckInfoCollector != null) {
-      _participantHealthReportTask.stop();
-    }
-
-    if (_timer != null) {
-      _timer.cancel();
-      _timer = null;
-    }
-
-    if (_instanceType == InstanceType.CONTROLLER) {
-      stopTimerTasks();
-    }
-
-    // unsubscribe accessor from controllerChange
-    _zkClient.unsubscribeAll();
-
-    _zkClient.close();
-
-    // HACK seems that zkClient is not sending DISCONNECT event
-    _zkStateChangeListener.disconnect();
-    logger.info("Cluster manager: " + _instanceName + " disconnected");
-
-  }
-
-  @Override
-  public String getSessionId() {
-    checkConnected();
-    return _sessionId;
-  }
-
-  @Override
-  public boolean isConnected() {
-    return _zkStateChangeListener.isConnected();
-  }
-
-  @Override
-  public long getLastNotificationTime() {
-    return -1;
-  }
-
-  private void addLiveInstance() {
-    LiveInstance liveInstance = new LiveInstance(_instanceName);
-    liveInstance.setSessionId(_sessionId);
-    liveInstance.setHelixVersion(_version);
-    liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
-    if (_liveInstanceInfoProvider != null) {
-      logger.info("invoking _liveInstanceInfoProvider");
-      ZNRecord additionalLiveInstanceInfo =
-          _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
-      if (additionalLiveInstanceInfo != null) {
-        additionalLiveInstanceInfo.merge(liveInstance.getRecord());
-        ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
-        liveInstance = new LiveInstance(mergedLiveInstance);
-        logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString());
-      }
-    }
-
-    logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:" + _sessionId);
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-    if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName), liveInstance)) {
-      String errorMsg =
-          "Fail to create live instance node after waiting, so quit. instance:" + _instanceName;
-      logger.warn(errorMsg);
-      throw new HelixException(errorMsg);
-
-    }
-    String currentStatePathParent =
-        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName,
-            getSessionId());
-
-    if (!_zkClient.exists(currentStatePathParent)) {
-      _zkClient.createPersistent(currentStatePathParent);
-      logger.info("Creating current state path " + currentStatePathParent);
-    }
-  }
-
-  private void startStatusUpdatedumpTask() {
-    long initialDelay = 30 * 60 * 1000;
-    long period = 120 * 60 * 1000;
-    int timeThresholdNoChange = 180 * 60 * 1000;
+  BaseDataAccessor<ZNRecord> createBaseDataAccessor() {
+    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
 
-    if (_timer == null) {
-      _timer = new Timer(true);
-      _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this, _zkClient, timeThresholdNoChange),
-          initialDelay, period);
-    }
+    return baseDataAccessor;
   }
 
-  private void createClient(String zkServers) throws Exception {
-    // by default use ZNRecordStreamingSerializer except for paths within the property
-    // store which expects raw byte[] serialization/deserialization
+  void createClient() throws Exception {
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
 
-    _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
+    _zkclient =
+        new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
 
-    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-    if (_instanceType == InstanceType.PARTICIPANT) {
-      String curStatePath =
-          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName);
-      _baseDataAccessor =
-          new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath));
-    } else if (_instanceType == InstanceType.CONTROLLER) {
-      String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
+    _baseDataAccessor = createBaseDataAccessor();
 
-      _clusterName);
-      _baseDataAccessor =
-          new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
+    _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
+    _configAccessor = new ConfigAccessor(_zkclient);
 
-    } else {
-      _baseDataAccessor = baseDataAccessor;
-    }
-
-    _helixAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
-    _configAccessor = new ConfigAccessor(_zkClient);
     int retryCount = 0;
 
-    _zkClient.subscribeStateChanges(_zkStateChangeListener);
-    while (retryCount < RETRY_LIMIT) {
+    _zkclient.subscribeStateChanges(this);
+    while (retryCount < 3) {
       try {
-        _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
-        _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
-        _zkStateChangeListener.handleNewSession();
+        _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
+        handleStateChanged(KeeperState.SyncConnected);
+        handleNewSession();
         break;
       } catch (HelixException e) {
-        logger.error("fail to createClient.", e);
+        LOG.error("fail to createClient.", e);
         throw e;
       } catch (Exception e) {
         retryCount++;
 
-        logger.error("fail to createClient. retry " + retryCount, e);
-        if (retryCount == RETRY_LIMIT) {
+        LOG.error("fail to createClient. retry " + retryCount, e);
+        if (retryCount == 3) {
           throw e;
         }
       }
     }
   }
 
-  private CallbackHandler createCallBackHandler(PropertyKey propertyKey, Object listener,
-      EventType[] eventTypes, ChangeType changeType) {
-    if (listener == null) {
-      throw new HelixException("Listener cannot be null");
+  @Override
+  public void connect() throws Exception {
+    LOG.info("ClusterManager.connect()");
+    if (isConnected()) {
+      LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
+          + " already connected. skip connect");
+      return;
     }
-    return new CallbackHandler(this, _zkClient, propertyKey, listener, eventTypes, changeType);
-  }
-
-  /**
-   * This will be invoked when ever a new session is created<br/>
-   * case 1: the cluster manager was a participant carry over current state, add live
-   * instance, and invoke message listener; case 2: the cluster manager was controller and
-   * was a leader before do leader election, and if it becomes leader again, invoke ideal
-   * state listener, current state listener, etc. if it fails to become leader in the new
-   * session, then becomes standby; case 3: the cluster manager was controller and was NOT
-   * a leader before do leader election, and if it becomes leader, instantiate and invoke
-   * ideal state listener, current state listener, etc. if if fails to become leader in
-   * the new session, stay as standby
-   */
 
-  protected void handleNewSession() {
-    boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
-    while (!isConnected) {
-      logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
-          + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
-          + _clusterName);
-      isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
+    try {
+      createClient();
+      _messagingService.onConnected();
+    } catch (Exception e) {
+      LOG.error("fail to connect " + _instanceName, e);
+      disconnect();
+      throw e;
     }
+  }
 
-    ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
-
-    synchronized (this) {
-      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+  @Override
+  public void disconnect() {
+    if (_zkclient == null) {
+      LOG.info("instanceName: " + _instanceName + " already disconnected");
+      return;
     }
-    _baseDataAccessor.reset();
-
-    // reset all handlers so they have a chance to unsubscribe zk changes from zkclient
-    // abandon all callback-handlers added in expired session
-    resetHandlers();
 
-    logger.info("Handling new session, session id:" + _sessionId + ", instance:" + _instanceName
-        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
+    LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
 
-    logger.info(zkConnection.getZookeeper());
-
-    if (!ZKUtil.isClusterSetup(_clusterName, _zkClient)) {
-      throw new HelixException("Initial cluster structure is not set up for cluster:"
-          + _clusterName);
-    }
-    // Read cluster config and see if instance can auto join the cluster
-    boolean autoJoin = false;
     try {
-      HelixConfigScope scope =
-          new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(getClusterName())
-              .build();
-      autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN));
-      logger.info("Auto joining " + _clusterName + " is true");
-    } catch (Exception e) {
-    }
-    if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType)) {
-      if (!autoJoin) {
-        throw new HelixException("Initial cluster structure is not set up for instance:"
-            + _instanceName + " instanceType:" + _instanceType);
-      } else {
-        logger.info("Auto joining instance " + _instanceName);
-        InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
-        String hostName = _instanceName;
-        String port = "";
-        int lastPos = _instanceName.lastIndexOf("_");
-        if (lastPos > 0) {
-          hostName = _instanceName.substring(0, lastPos);
-          port = _instanceName.substring(lastPos + 1);
-        }
-        instanceConfig.setHostName(hostName);
-        instanceConfig.setPort(port);
-        instanceConfig.setInstanceEnabled(true);
-        getClusterManagmentTool().addInstance(_clusterName, instanceConfig);
-      }
-    }
+      /**
+       * stop all timer tasks
+       */
+      stopTimerTasks();
 
-    if (_instanceType == InstanceType.PARTICIPANT
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
-      handleNewSessionAsParticipant();
-    }
+      /**
+       * shutdown thread pool first to avoid reset() being invoked in the middle of state
+       * transition
+       */
+      _messagingService.getExecutor().shutdown();
+
+      // TODO reset user defined handlers only
+      resetHandlers();
 
-    if (_instanceType == InstanceType.CONTROLLER
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
-      addControllerMessageListener(_messagingService.getExecutor());
-      MessageHandlerFactory defaultControllerMsgHandlerFactory =
-          new DefaultControllerMessageHandlerFactory();
-      _messagingService.getExecutor().registerMessageHandlerFactory(
-          defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
-      MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
-          new DefaultSchedulerMessageHandlerFactory(this);
-      _messagingService.getExecutor().registerMessageHandlerFactory(
-          defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
-      MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
-          new DefaultParticipantErrorMessageHandlerFactory(this);
-      _messagingService.getExecutor().registerMessageHandlerFactory(
-          defaultParticipantErrorMessageHandlerFactory.getMessageType(),
-          defaultParticipantErrorMessageHandlerFactory);
+      _dataAccessor.shutdown();
 
       if (_leaderElectionHandler != null) {
         _leaderElectionHandler.reset();
-        _leaderElectionHandler.init();
-      } else {
-        _leaderElectionHandler =
-            createCallBackHandler(new Builder(_clusterName).controller(),
-                new DistClusterControllerElection(_zkConnectString), new EventType[] {
-                    EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-                }, ChangeType.CONTROLLER);
       }
-    }
 
-    if (_instanceType == InstanceType.PARTICIPANT
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
-        || (_instanceType == InstanceType.CONTROLLER && isLeader())) {
-      initHandlers();
+    } finally {
+      _zkclient.close();
+      _zkclient = null;
+      LOG.info("Cluster manager: " + _instanceName + " disconnected");
     }
   }
 
-  private void handleNewSessionAsParticipant() {
-    // In case there is a live instance record on zookeeper
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-
-    if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null) {
-      logger.warn("Found another instance with same instanceName: " + _instanceName
-          + " in cluster " + _clusterName);
-      // Wait for a while, in case previous storage node exits unexpectedly
-      // and its liveinstance
-      // still hangs around until session timeout happens
-      try {
-        Thread.sleep(_sessionTimeout + 5000);
-      } catch (InterruptedException e) {
-        logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.", e);
-      }
+  @Override
+  public String getSessionId() {
+    checkConnected();
+    return _sessionId;
+  }
 
-      if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null) {
-        String errorMessage =
-            "instance " + _instanceName + " already has a liveinstance in cluster " + _clusterName;
-        logger.error(errorMessage);
-        throw new HelixException(errorMessage);
-      }
+  @Override
+  public boolean isConnected() {
+    if (_zkclient == null) {
+      return false;
     }
-    // Invoke the PreConnectCallbacks
-    for (PreConnectCallback callback : _preConnectCallbacks) {
-      callback.onPreConnect();
+    ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
+    if (zkconnection != null) {
+      States state = zkconnection.getZookeeperState();
+      return state == States.CONNECTED;
     }
-    addLiveInstance();
-    carryOverPreviousCurrentState();
-
-    // In case the cluster manager is running as a participant, setup message
-    // listener
-    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
-        _stateMachEngine);
-    addMessageListener(_messagingService.getExecutor(), _instanceName);
-    addControllerListener(_helixAccessor);
-
-    ScheduledTaskStateModelFactory stStateModelFactory =
-        new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
-    _stateMachEngine.registerStateModelFactory(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
+    return false;
+  }
 
-    if (_participantHealthCheckInfoCollector == null) {
-      _participantHealthCheckInfoCollector =
-          new ParticipantHealthReportCollectorImpl(this, _instanceName);
-      _participantHealthReportTask =
-          new ParticipantHealthReportTask(_participantHealthCheckInfoCollector);
-      _participantHealthReportTask.start();
-    }
-    // start the participant health check timer, also create zk path for health
-    // check info
-    String healthCheckInfoPath = _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
-    if (!_zkClient.exists(healthCheckInfoPath)) {
-      _zkClient.createPersistent(healthCheckInfoPath, true);
-      logger.info("Creating healthcheck info path " + healthCheckInfoPath);
-    }
+  @Override
+  public long getLastNotificationTime() {
+    return 0;
   }
 
   @Override
   public void addPreConnectCallback(PreConnectCallback callback) {
-    logger.info("Adding preconnect callback");
+    LOG.info("Adding preconnect callback: " + callback);
     _preConnectCallbacks.add(callback);
   }
 
-  private void resetHandlers() {
-    synchronized (this) {
-      if (_handlers != null) {
-        // get a copy of the list and iterate over the copy list
-        // in case handler.reset() will modify the original handler list
-        List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
-        tmpHandlers.addAll(_handlers);
-
-        for (CallbackHandler handler : tmpHandlers) {
-          handler.reset();
-          logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
-        }
-      }
-    }
-  }
-
-  private void initHandlers() {
-    synchronized (this) {
-      if (_handlers != null) {
-        // may add new currentState and message listeners during init()
-        // so make a copy and iterate over the copy
-        List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
-        tmpHandlers.addAll(_handlers);
-        for (CallbackHandler handler : tmpHandlers) {
-          handler.init();
-          logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
-        }
-      }
-    }
-  }
-
   @Override
   public boolean isLeader() {
-    if (!isConnected()) {
+    if (_instanceType != InstanceType.CONTROLLER
+        && _instanceType != InstanceType.CONTROLLER_PARTICIPANT) {
       return false;
     }
 
-    if (_instanceType != InstanceType.CONTROLLER) {
-      return false;
-    }
-
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-    LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null) {
+    if (!isConnected()) {
       return false;
-    } else {
-      String leaderName = leader.getInstanceName();
-      // TODO need check sessionId also, but in distributed mode, leader's
-      // sessionId is
-      // not equal to
-      // the leader znode's sessionId field which is the sessionId of the
-      // controller_participant that
-      // successfully creates the leader node
-      if (leaderName == null || !leaderName.equals(_instanceName)) {
-        return false;
-      }
     }
-    return true;
-  }
-
-  /**
-   * carry over current-states from last sessions
-   * set to initial state for current session only when the state doesn't exist in current session
-   */
-  private void carryOverPreviousCurrentState() {
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-    List<String> sessions = _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
 
-    // carry-over
-    for (String session : sessions) {
-      if (session.equals(_sessionId)) {
-        continue;
-      }
-
-      List<CurrentState> lastCurStates =
-          _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName, session));
-
-      for (CurrentState lastCurState : lastCurStates) {
-        logger.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId()
-            + " to current session: " + _sessionId);
-        String stateModelDefRef = lastCurState.getStateModelDefRef();
-        if (stateModelDefRef == null) {
-          logger
-              .error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
-                  + lastCurState);
-          continue;
+    try {
+      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
+      if (leader != null) {
+        String leaderName = leader.getInstanceName();
+        String sessionId = leader.getSessionId();
+        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
+            && sessionId.equals(_sessionId)) {
+          return true;
         }
-        StateModelDefinition stateModel =
-            _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
-
-        String curStatePath =
-            keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName())
-                .getPath();
-        _helixAccessor.getBaseDataAccessor().update(curStatePath,
-            new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState),
-            AccessOption.PERSISTENT);
-      }
-    }
-
-    // remove previous current states
-    for (String session : sessions) {
-      if (session.equals(_sessionId)) {
-        continue;
       }
-
-      String path = _helixAccessor.keyBuilder().currentStates(_instanceName, session).getPath();
-      logger.info("Removing current states from previous sessions. path: " + path);
-      _zkClient.deleteRecursive(path);
+    } catch (Exception e) {
+      // log
     }
+    return false;
   }
 
   @Override
@@ -862,7 +626,7 @@ public class ZKHelixManager implements HelixManager {
       String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
 
       _helixPropertyStore =
-          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient), path,
+          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
               null);
     }
 
@@ -872,13 +636,12 @@ public class ZKHelixManager implements HelixManager {
   @Override
   public synchronized HelixAdmin getClusterManagmentTool() {
     checkConnected();
-    if (_zkClient != null) {
-      _managementTool = new ZKHelixAdmin(_zkClient);
-    } else {
-      logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
+    if (_zkclient != null) {
+      return new ZKHelixAdmin(_zkclient);
     }
 
-    return _managementTool;
+    LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
+    return null;
   }
 
   @Override
@@ -891,7 +654,7 @@ public class ZKHelixManager implements HelixManager {
   @Override
   public ParticipantHealthReportCollector getHealthReportCollector() {
     checkConnected();
-    return _participantHealthCheckInfoCollector;
+    return _participantHealthInfoCollector;
   }
 
   @Override
@@ -899,12 +662,6 @@ public class ZKHelixManager implements HelixManager {
     return _instanceType;
   }
 
-  private void checkConnected() {
-    if (!isConnected()) {
-      throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
-    }
-  }
-
   @Override
   public String getVersion() {
     return _version;
@@ -917,21 +674,20 @@ public class ZKHelixManager implements HelixManager {
 
   @Override
   public StateMachineEngine getStateMachineEngine() {
-    return _stateMachEngine;
+    return _stateMachineEngine;
   }
 
   // TODO: rename this and not expose this function as part of interface
   @Override
   public void startTimerTasks() {
-    for (HelixTimerTask task : _controllerTimerTasks) {
+    for (HelixTimerTask task : _timerTasks) {
       task.start();
     }
-    startStatusUpdatedumpTask();
   }
 
   @Override
   public void stopTimerTasks() {
-    for (HelixTimerTask task : _controllerTimerTasks) {
+    for (HelixTimerTask task : _timerTasks) {
       task.stop();
     }
   }
@@ -940,4 +696,209 @@ public class ZKHelixManager implements HelixManager {
   public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
     _liveInstanceInfoProvider = liveInstanceInfoProvider;
   }
+
+  /**
+   * wait until we get a non-zero session-id. note that we might lose zkconnection
+   * right after we read session-id. but it's ok to get stale session-id and we will have
+   * another handle-new-session callback to correct this.
+   */
+  void waitUntilConnected() {
+    boolean isConnected;
+    do {
+      isConnected =
+          _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+      if (!isConnected) {
+        LOG.error("fail to connect zkserver: " + _zkAddress + " in "
+            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+            + ", clusterName: " + _clusterName);
+        continue;
+      }
+
+      ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
+      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+
+      /**
+       * at the time we read session-id, zkconnection might be lost again
+       * wait until we get a non-zero session-id
+       */
+    } while ("0".equals(_sessionId));
+
+    LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
+        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
+        + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+  }
+
+  void initHandlers(List<CallbackHandler> handlers) {
+    synchronized (this) {
+      if (handlers != null) {
+        for (CallbackHandler handler : handlers) {
+          handler.init();
+          LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
+        }
+      }
+    }
+  }
+
+  void resetHandlers() {
+    synchronized (this) {
+      if (_handlers != null) {
+        // get a copy of the list and iterate over the copy list
+        // in case handler.reset() modify the original handler list
+        List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+        tmpHandlers.addAll(_handlers);
+
+        for (CallbackHandler handler : tmpHandlers) {
+          handler.reset();
+          LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
+        }
+      }
+    }
+  }
+
+  /**
+   * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
+   * _timeWindowLengthMs Ms
+   * time window, we think that there are something wrong going on and disconnect the zkHelixManager
+   * from zk.
+   */
+  boolean isFlapping() {
+    if (_disconnectTimeHistory.size() == 0) {
+      return false;
+    }
+    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
+
+    // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
+    while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
+      _disconnectTimeHistory.remove(0);
+    }
+    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
+  }
+
+  @Override
+  public void handleStateChanged(KeeperState state) throws Exception {
+    switch (state) {
+    case SyncConnected:
+      ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
+      LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
+      break;
+    case Disconnected:
+      LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: "
+          + _instanceName + ", type: " + _instanceType);
+
+      /**
+       * Track the time stamp that the disconnected happens, then check history and see if
+       * we should disconnect the helix-manager
+       */
+      _disconnectTimeHistory.add(System.currentTimeMillis());
+      if (isFlapping()) {
+        LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
+            + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
+            + _flappingTimeWindowMs + "ms.");
+        disconnect();
+      }
+      break;
+    case Expired:
+      LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
+          + _instanceName + ", type: " + _instanceType);
+      break;
+    default:
+      break;
+    }
+  }
+
+  @Override
+  public void handleNewSession() throws Exception {
+    waitUntilConnected();
+
+    /**
+     * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
+     * disconnect if fail to cleanup
+     */
+    stopTimerTasks();
+    if (_leaderElectionHandler != null) {
+      _leaderElectionHandler.reset();
+    }
+    resetHandlers();
+
+    /**
+     * clean up write-through cache
+     */
+    _baseDataAccessor.reset();
+
+    /**
+     * from here on, we are dealing with new session
+     */
+    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
+      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
+    }
+
+    switch (_instanceType) {
+    case PARTICIPANT:
+      handleNewSessionAsParticipant();
+      break;
+    case CONTROLLER:
+      handleNewSessionAsController();
+      break;
+    case CONTROLLER_PARTICIPANT:
+      handleNewSessionAsParticipant();
+      handleNewSessionAsController();
+      break;
+    case ADMINISTRATOR:
+    case SPECTATOR:
+    default:
+      break;
+    }
+
+    startTimerTasks();
+
+    /**
+     * init handlers
+     * ok to init message handler and data-accessor twice
+     * the second init will be skipped (see CallbackHandler)
+     */
+    initHandlers(_handlers);
+  }
+
+  void handleNewSessionAsParticipant() throws Exception {
+    /**
+     * auto-join
+     */
+    ParticipantManagerHelper participantHelper =
+        new ParticipantManagerHelper(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider);
+    participantHelper.joinCluster();
+
+    /**
+     * Invoke PreConnectCallbacks
+     */
+    for (PreConnectCallback callback : _preConnectCallbacks) {
+      callback.onPreConnect();
+    }
+
+    participantHelper.createLiveInstance();
+    participantHelper.carryOverPreviousCurrentState();
+
+    /**
+     * setup message listener
+     */
+    participantHelper.setupMsgHandler();
+
+    /**
+     * start health check timer task
+     */
+    participantHelper.createHealthCheckPath();
+  }
+
+  void handleNewSessionAsController() {
+    if (_leaderElectionHandler != null) {
+      _leaderElectionHandler.init();
+    } else {
+      _leaderElectionHandler =
+          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
+              new DistributedLeaderElection(this, _controller, _controllerTimerTasks),
+              new EventType[] {
+                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+              }, ChangeType.CONTROLLER);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
index 5865863..755ca52 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
@@ -143,8 +143,7 @@ public class ZkAsyncCallbacks {
           }
         }
       } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        LOG.error("Interrupted waiting for success", e);
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
deleted file mode 100644
index e11444b..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkStateChangeListener.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-public class ZkStateChangeListener implements IZkStateListener {
-  private volatile boolean _isConnected;
-  private volatile boolean _hasSessionExpired;
-  private final ZKHelixManager _zkHelixManager;
-
-  // Keep track of timestamps that zk State has become Disconnected
-  // If in a _timeWindowLengthMs window zk State has become Disconnected
-  // for more than_maxDisconnectThreshold times disconnect the zkHelixManager
-  List<Long> _disconnectTimeHistory = new LinkedList<Long>();
-  int _timeWindowLengthMs;
-  int _maxDisconnectThreshold;
-
-  private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
-
-  public ZkStateChangeListener(ZKHelixManager zkHelixManager, int timeWindowLengthMs,
-      int maxDisconnectThreshold) {
-    this._zkHelixManager = zkHelixManager;
-    _timeWindowLengthMs = timeWindowLengthMs;
-    // _maxDisconnectThreshold min value is 1.
-    // We don't want to disconnect from zk for the first time zkState become Disconnected
-    _maxDisconnectThreshold = maxDisconnectThreshold > 0 ? maxDisconnectThreshold : 1;
-  }
-
-  @Override
-  public void handleNewSession() {
-    // TODO:bug in zkclient .
-    // zkclient does not invoke handleStateChanged when a session expires but
-    // directly invokes handleNewSession
-    _isConnected = true;
-    _hasSessionExpired = false;
-    _zkHelixManager.handleNewSession();
-  }
-
-  @Override
-  public void handleStateChanged(KeeperState keeperState) throws Exception {
-    switch (keeperState) {
-    case SyncConnected:
-      ZkConnection zkConnection = ((ZkConnection) _zkHelixManager._zkClient.getConnection());
-      logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
-      _isConnected = true;
-      break;
-    case Disconnected:
-      logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
-          + _zkHelixManager._sessionId + ", instance: " + _zkHelixManager.getInstanceName()
-          + ", type: " + _zkHelixManager.getInstanceType());
-
-      _isConnected = false;
-      // Track the time stamp that the disconnected happens, then check history and see if
-      // we should disconnect the _zkHelixManager
-      _disconnectTimeHistory.add(System.currentTimeMillis());
-      if (isFlapping()) {
-        logger.error("isFlapping() returns true, so disconnect the helix manager. "
-            + _zkHelixManager.getInstanceName() + " " + _maxDisconnectThreshold
-            + " disconnects in " + _timeWindowLengthMs + " Ms.");
-        _zkHelixManager.disconnectInternal();
-      }
-      break;
-    case Expired:
-      logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
-          + _zkHelixManager._sessionId + ", instance: " + _zkHelixManager.getInstanceName()
-          + ", type: " + _zkHelixManager.getInstanceType());
-
-      _isConnected = false;
-      _hasSessionExpired = true;
-      break;
-    }
-  }
-
-  boolean isConnected() {
-    return _isConnected;
-  }
-
-  void disconnect() {
-    _isConnected = false;
-  }
-
-  boolean hasSessionExpired() {
-    return _hasSessionExpired;
-  }
-
-  /**
-   * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
-   * _timeWindowLengthMs Ms
-   * time window, we think that there are something wrong going on and disconnect the zkHelixManager
-   * from zk.
-   */
-  boolean isFlapping() {
-    if (_disconnectTimeHistory.size() == 0) {
-      return false;
-    }
-    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
-    // Remove disconnect history timestamp that are older than _timeWindowLengthMs ago
-    while ((_disconnectTimeHistory.get(0) + _timeWindowLengthMs) < mostRecentTimestamp) {
-      _disconnectTimeHistory.remove(0);
-    }
-    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index 73b69a8..422d35e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -117,9 +117,6 @@ public class DefaultMessagingService implements ClusterMessagingService {
         Builder keyBuilder = accessor.keyBuilder();
 
         if (receiverType == InstanceType.CONTROLLER) {
-          // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
-          // tempMessage,
-          // tempMessage.getId());
           accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()), tempMessage);
         }
 
@@ -137,6 +134,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
     return totalMessageCount;
   }
 
+  @Override
   public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
       final Message message) {
     Map<InstanceType, List<Message>> messagesToSendMap = new HashMap<InstanceType, List<Message>>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 8672e7e..6da3dc2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -32,7 +32,6 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 /**
@@ -94,20 +93,18 @@ public class ResourceAssignment extends HelixProperty {
   /**
    * Get the participant, state pairs for a partition
    * @param partition the Partition to look up
-   * @return immutable map of (participant id, state)
+   * @return map of (participant id, state)
    */
   public Map<ParticipantId, State> getReplicaMap(PartitionId partitionId) {
     Map<String, String> rawReplicaMap = _record.getMapField(partitionId.stringify());
-    if (rawReplicaMap == null) {
-      return Collections.emptyMap();
-    }
-    ImmutableMap.Builder<ParticipantId, State> builder =
-        new ImmutableMap.Builder<ParticipantId, State>();
-    for (String participantName : rawReplicaMap.keySet()) {
-      builder.put(ParticipantId.from(participantName),
-          State.from(rawReplicaMap.get(participantName)));
+    Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+    if (rawReplicaMap != null) {
+      for (String participantName : rawReplicaMap.keySet()) {
+        replicaMap.put(ParticipantId.from(participantName),
+            State.from(rawReplicaMap.get(participantName)));
+      }
     }
-    return builder.build();
+    return replicaMap;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 0558764..919eb00 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -472,6 +472,12 @@ public class Mocks {
       return _properties;
     }
 
+    @Override
+    public void addControllerMessageListener(MessageListener listener) {
+      // TODO Auto-generated method stub
+
+    }
+
   }
 
   public static class MockAccessor implements HelixDataAccessor // DataAccessor

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 4bdd423..871d717 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -50,7 +50,6 @@ import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -65,8 +64,6 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
-import org.apache.helix.participant.DistClusterControllerStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.util.ZKClientPool;
@@ -131,7 +128,7 @@ public class TestHelper {
           try {
             zkClient.deleteRecursive(rootNamespace);
           } catch (Exception e) {
-            LOG.error("fail to deleteRecursive path:" + rootNamespace + "\nexception:" + e);
+            LOG.error("fail to deleteRecursive path:" + rootNamespace, e);
           }
         }
       }
@@ -152,90 +149,6 @@ public class TestHelper {
     }
   }
 
-  public static StartCMResult startDummyProcess(final String zkAddr, final String clusterName,
-      final String instanceName) throws Exception {
-    StartCMResult result = new StartCMResult();
-    ZkHelixTestManager manager = null;
-    manager = new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
-    result._manager = manager;
-    Thread thread = new Thread(new DummyProcessThread(manager, instanceName));
-    result._thread = thread;
-    thread.start();
-
-    return result;
-  }
-
-  private static ZkHelixTestManager startHelixController(final String zkConnectString,
-      final String clusterName, final String controllerName, final String controllerMode) {
-    ZkHelixTestManager manager = null;
-    try {
-      if (controllerMode.equalsIgnoreCase(HelixControllerMain.STANDALONE)) {
-        manager =
-            new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER,
-                zkConnectString);
-        manager.connect();
-      } else if (controllerMode.equalsIgnoreCase(HelixControllerMain.DISTRIBUTED)) {
-        manager =
-            new ZkHelixTestManager(clusterName, controllerName,
-                InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
-
-        DistClusterControllerStateModelFactory stateModelFactory =
-            new DistClusterControllerStateModelFactory(zkConnectString);
-
-        StateMachineEngine stateMach = manager.getStateMachineEngine();
-        stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
-        manager.connect();
-      } else {
-        LOG.error("cluster controller mode:" + controllerMode + " NOT supported");
-      }
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-
-    return manager;
-  }
-
-  // TODO refactor this
-  public static StartCMResult startController(final String clusterName,
-      final String controllerName, final String zkConnectString, final String controllerMode)
-      throws Exception {
-    final StartCMResult result = new StartCMResult();
-    final ZkHelixTestManager manager =
-        startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
-    result._manager = manager;
-
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // ClusterManager manager = null;
-
-        try {
-
-          Thread.currentThread().join();
-        } catch (InterruptedException e) {
-          String msg =
-              "controller:" + controllerName + ", " + Thread.currentThread().getName()
-                  + " interrupted";
-          LOG.info(msg);
-          // System.err.println(msg);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    });
-
-    thread.start();
-    result._thread = thread;
-    return result;
-  }
-
-  public static class StartCMResult {
-    public Thread _thread;
-    public ZkHelixTestManager _manager;
-
-  }
-
   public static void setupEmptyCluster(ZkClient zkClient, String clusterName) {
     ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
     admin.addCluster(clusterName, true);
@@ -251,11 +164,6 @@ public class TestHelper {
     return set;
   }
 
-  // public static void verifyWithTimeout(String verifierName, Object... args)
-  // {
-  // verifyWithTimeout(verifierName, 30 * 1000, args);
-  // }
-
   /**
    * generic method for verification with a timeout
    * @param verifierName
@@ -292,8 +200,7 @@ public class TestHelper {
 
       Assert.assertTrue(result);
     } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Exception in verify: " + verifierName, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
index 37c68a2..0decbd8 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -34,6 +35,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestZkClientWrapper extends ZkUnitTestBase {
+  private static Logger LOG = Logger.getLogger(TestZkClientWrapper.class);
+
   ZkClient _zkClient;
 
   @BeforeClass
@@ -69,7 +72,7 @@ public class TestZkClientWrapper extends ZkUnitTestBase {
   }
 
   @Test()
-  void testSessioExpire() {
+  void testSessioExpire() throws Exception {
     IZkStateListener listener = new IZkStateListener() {
 
       @Override
@@ -82,31 +85,27 @@ public class TestZkClientWrapper extends ZkUnitTestBase {
         System.out.println("In Old connection New session");
       }
     };
+
     _zkClient.subscribeStateChanges(listener);
     ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
     ZooKeeper zookeeper = connection.getZookeeper();
     System.out.println("old sessionId= " + zookeeper.getSessionId());
-    try {
-      Watcher watcher = new Watcher() {
-        @Override
-        public void process(WatchedEvent event) {
-          System.out.println("In New connection In process event:" + event);
-        }
-      };
-      ZooKeeper newZookeeper =
-          new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
-              zookeeper.getSessionId(), zookeeper.getSessionPasswd());
-      Thread.sleep(3000);
-      System.out.println("New sessionId= " + newZookeeper.getSessionId());
-      Thread.sleep(3000);
-      newZookeeper.close();
-      Thread.sleep(10000);
-      connection = ((ZkConnection) _zkClient.getConnection());
-      zookeeper = connection.getZookeeper();
-      System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        System.out.println("In New connection In process event:" + event);
+      }
+    };
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+    Thread.sleep(3000);
+    System.out.println("New sessionId= " + newZookeeper.getSessionId());
+    Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = ((ZkConnection) _zkClient.getConnection());
+    zookeeper = connection.getZookeeper();
+    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
index 9c71fdd..fdf6e72 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -215,8 +215,7 @@ public class TestZnodeModify extends ZkUnitTestBase {
           zkClient.createPersistent(pathChild1, true);
           zkClient.writeData(pathChild1, record);
         } catch (InterruptedException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
+          logger.error("Interrupted sleep", e);
         }
       }
     }.start();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java b/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java
deleted file mode 100644
index b660a1d..0000000
--- a/helix-core/src/test/java/org/apache/helix/ZkHelixTestManager.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-
-// ZkHelixManager used for test only. expose more class members
-public class ZkHelixTestManager extends ZKHelixManager {
-
-  public ZkHelixTestManager(String clusterName, String instanceName, InstanceType instanceType,
-      String zkConnectString) throws Exception {
-    super(clusterName, instanceName, instanceType, zkConnectString);
-    // TODO Auto-generated constructor stub
-  }
-
-  public ZkClient getZkClient() {
-    return _zkClient;
-  }
-
-  public List<CallbackHandler> getHandlers() {
-    return _handlers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 369ad68..0d597f6 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -39,10 +39,10 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -59,8 +59,8 @@ public class TestNewStages extends ZkUnitTestBase {
   final int n = 2;
   final int p = 8;
   final int r = 2;
-  MockParticipant[] _participants = new MockParticipant[n];
-  ClusterController _controller;
+  MockParticipantManager[] _participants = new MockParticipantManager[n];
+  ClusterControllerManager _controller;
 
   ClusterId _clusterId;
   HelixDataAccessor _dataAccessor;
@@ -217,14 +217,14 @@ public class TestNewStages extends ZkUnitTestBase {
         r, // replicas
         "MasterSlave", true); // do rebalance
 
-    _controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    _controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     _controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      _participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       _participants[i].syncStart();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 26da8ee..fc9b7d5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -254,4 +254,10 @@ public class DummyClusterManager implements HelixManager {
     // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public void addControllerMessageListener(MessageListener listener) {
+    // TODO Auto-generated method stub
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index ba61361..dd5f441 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -31,9 +32,11 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.State;
 import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -43,8 +46,10 @@ import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -62,7 +67,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     // ideal state: node0 is MASTER, node1 is SLAVE
     // replica=2 means 1 master and 1 slave
-    setupIdealState(clusterName, new int[] {
+    List<IdealState> idealStates = setupIdealState(clusterName, new int[] {
         0, 1
     }, new String[] {
       "TestDB"
@@ -78,6 +83,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
+    // get an empty best possible output for the partitions
+    BestPossibleStateOutput bestPossOutput = getEmptyBestPossibleStateOutput(idealStates);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossOutput);
+
     MessageThrottleStage throttleStage = new MessageThrottleStage();
     try {
       runStage(event, throttleStage);
@@ -138,7 +147,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     // ideal state: node0 is MASTER, node1 is SLAVE
     // replica=2 means 1 master and 1 slave
-    setupIdealState(clusterName, new int[] {
+    List<IdealState> idealStates = setupIdealState(clusterName, new int[] {
         0, 1
     }, new String[] {
       "TestDB"
@@ -270,6 +279,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
+    // get an empty best possible output for the partitions
+    BestPossibleStateOutput bestPossOutput = getEmptyBestPossibleStateOutput(idealStates);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossOutput);
+
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
     runPipeline(event, dataRefresh);
@@ -329,5 +342,18 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     return false;
   }
 
+  private BestPossibleStateOutput getEmptyBestPossibleStateOutput(List<IdealState> idealStates) {
+    BestPossibleStateOutput output = new BestPossibleStateOutput();
+    for (IdealState idealState : idealStates) {
+      ResourceId resourceId = idealState.getResourceId();
+      ResourceAssignment assignment = new ResourceAssignment(resourceId);
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        Map<ParticipantId, State> emptyMap = Collections.emptyMap();
+        assignment.addReplicaMap(partitionId, emptyMap);
+      }
+      output.setResourceAssignment(resourceId, assignment);
+    }
+    return output;
+  }
   // add pending message test case
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
index 8e7b85a..2dbf5f6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
@@ -27,8 +27,8 @@ import org.testng.annotations.Test;
 public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase {
   @Test
   public void TestParse() {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+
+    HelixManager manager = _controller;
 
     String instanceName =
         StatsAggregationStage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
@@ -50,6 +50,7 @@ public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase {
     String partitionName =
         StatsAggregationStage.parsePartitionName(
             "localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
+
     Assert.assertTrue(partitionName.equals("TestDB_22"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 4129f66..2e17ad3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -37,6 +37,16 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -138,6 +148,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    ClusterEvent event = new ClusterEvent("testEvent");
+
     final String resourceName = "testResource_dup";
     String[] resourceGroups = new String[] {
       resourceName
@@ -161,8 +174,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
     clusterAccessor.initClusterStructure();
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // round1: controller sends O->S to both node0 and node1
     Thread.sleep(1000);