You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[17/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java
deleted file mode 100644
index a8891d6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixManager.java
+++ /dev/null
@@ -1,1098 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import static com.linkedin.helix.HelixConstants.ChangeType.CONFIG;
-import static com.linkedin.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static com.linkedin.helix.HelixConstants.ChangeType.HEALTH;
-import static com.linkedin.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Timer;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixTimerTask;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.PreConnectCallback;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.healthcheck.HealthStatsAggregationTask;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollector;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.StateModelDefinition;
-import com.linkedin.helix.monitoring.ZKPathDataDumpTask;
-import com.linkedin.helix.participant.DistClusterControllerElection;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.ZNRecordJsonSerializer;
-import com.linkedin.helix.store.zk.ZKPropertyStore;
-import com.linkedin.helix.store.zk.ZkHelixPropertyStore;
-import com.linkedin.helix.tools.PropertiesReader;
-
-public class ZKHelixManager implements HelixManager
-{
-  private static Logger                        logger                  =
-                                                                           Logger.getLogger(ZKHelixManager.class);
-  private static final int                     RETRY_LIMIT             = 3;
-  private static final int                     CONNECTIONTIMEOUT       = 60 * 1000;
-  private final String                         _clusterName;
-  private final String                         _instanceName;
-  private final String                         _zkConnectString;
-  private static final int                     DEFAULT_SESSION_TIMEOUT = 30 * 1000;
-  private ZKDataAccessor                       _accessor;
-  private ZKHelixDataAccessor                  _helixAccessor;
-  private ConfigAccessor                       _configAccessor;
-  protected ZkClient                           _zkClient;
-  private final List<CallbackHandler>          _handlers;
-  private final ZkStateChangeListener          _zkStateChangeListener;
-  private final InstanceType                   _instanceType;
-  volatile String                              _sessionId;
-  private Timer                                _timer;
-  private CallbackHandler                      _leaderElectionHandler;
-  private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
-  private final DefaultMessagingService        _messagingService;
-  private ZKHelixAdmin                         _managementTool;
-  private final String                         _version;
-  private final StateMachineEngine             _stateMachEngine;
-  private int                                  _sessionTimeout;
-  private PropertyStore<ZNRecord>              _propertyStore;
-  private ZkHelixPropertyStore<ZNRecord>       _helixPropertyStore;
-  private final List<HelixTimerTask>           _controllerTimerTasks;
-  private BaseDataAccessor<ZNRecord>           _baseDataAccessor;
-  List<PreConnectCallback>                     _preConnectCallbacks    =
-                                                                           new LinkedList<PreConnectCallback>();
-  ZKPropertyTransferServer                     _transferServer         = null;
-
-  public ZKHelixManager(String clusterName,
-                        String instanceName,
-                        InstanceType instanceType,
-                        String zkConnectString) throws Exception
-  {
-    logger.info("Create a zk-based cluster manager. clusterName:" + clusterName
-        + ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:"
-        + zkConnectString);
-    int sessionTimeoutInt = -1;
-    try
-    {
-      sessionTimeoutInt =
-          Integer.parseInt(System.getProperty("zk.session.timeout", ""
-              + DEFAULT_SESSION_TIMEOUT));
-    }
-    catch (NumberFormatException e)
-    {
-      logger.warn("Exception while parsing session timeout: "
-          + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
-    }
-    if (sessionTimeoutInt > 0)
-    {
-      _sessionTimeout = sessionTimeoutInt;
-    }
-    else
-    {
-      _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
-    }
-    if (instanceName == null)
-    {
-      try
-      {
-        instanceName =
-            InetAddress.getLocalHost().getCanonicalHostName() + "-"
-                + instanceType.toString();
-      }
-      catch (UnknownHostException e)
-      {
-        // can ignore it
-        logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable",
-                    e);
-        instanceName = "UNKNOWN";
-      }
-    }
-
-    _clusterName = clusterName;
-    _instanceName = instanceName;
-    _instanceType = instanceType;
-    _zkConnectString = zkConnectString;
-    _zkStateChangeListener = new ZkStateChangeListener(this);
-    _timer = null;
-
-    _handlers = new ArrayList<CallbackHandler>();
-
-    _messagingService = new DefaultMessagingService(this);
-
-    _version =
-        new PropertiesReader("cluster-manager-version.properties").getProperty("clustermanager.version");
-
-    _stateMachEngine = new HelixStateMachineEngine(this);
-
-    // add all timer tasks
-    _controllerTimerTasks = new ArrayList<HelixTimerTask>();
-    if (_instanceType == InstanceType.CONTROLLER)
-    {
-      _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
-    }
-  }
-
-  private boolean isInstanceSetup()
-  {
-    if (_instanceType == InstanceType.PARTICIPANT
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
-    {
-      boolean isValid =
-          _zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                                      _clusterName,
-                                                      ConfigScopeProperty.PARTICIPANT.toString(),
-                                                      _instanceName))
-              && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES,
-                                                             _clusterName,
-                                                             _instanceName))
-              && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
-                                                             _clusterName,
-                                                             _instanceName))
-              && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES,
-                                                             _clusterName,
-                                                             _instanceName))
-              && _zkClient.exists(PropertyPathConfig.getPath(PropertyType.ERRORS,
-                                                             _clusterName,
-                                                             _instanceName));
-
-      return isValid;
-    }
-    return true;
-  }
-
-  @Override
-  public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception
-  {
-    logger.info("ClusterManager.addIdealStateChangeListener()");
-    checkConnected();
-    final String path =
-        PropertyPathConfig.getPath(PropertyType.IDEALSTATES, _clusterName);
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeDataChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              IDEAL_STATE);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
-  {
-    logger.info("ClusterManager.addLiveInstanceChangeListener()");
-    checkConnected();
-    final String path = _helixAccessor.keyBuilder().liveInstances().getPath();
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              LIVE_INSTANCE);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public void addConfigChangeListener(ConfigChangeListener listener)
-  {
-    logger.info("ClusterManager.addConfigChangeListener()");
-    checkConnected();
-    final String path =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   _clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString());
-
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeChildrenChanged },
-                              CONFIG);
-    addListener(callbackHandler);
-
-  }
-
-  // TODO: Decide if do we still need this since we are exposing
-  // ClusterMessagingService
-  @Override
-  public void addMessageListener(MessageListener listener, String instanceName)
-  {
-    logger.info("ClusterManager.addMessageListener() " + instanceName);
-    checkConnected();
-    final String path = _helixAccessor.keyBuilder().messages(instanceName).getPath();
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeChildrenChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              MESSAGE);
-    addListener(callbackHandler);
-  }
-
-  void addControllerMessageListener(MessageListener listener)
-  {
-    logger.info("ClusterManager.addControllerMessageListener()");
-    checkConnected();
-    final String path = _helixAccessor.keyBuilder().controllerMessages().getPath();
-
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeChildrenChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              MESSAGES_CONTROLLER);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
-                                            String instanceName,
-                                            String sessionId)
-  {
-    logger.info("ClusterManager.addCurrentStateChangeListener() " + instanceName + " "
-        + sessionId);
-    checkConnected();
-    final String path =
-        _helixAccessor.keyBuilder().currentStates(instanceName, sessionId).getPath();
-
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeChildrenChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              CURRENT_STATE);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public void addHealthStateChangeListener(HealthStateChangeListener listener,
-                                           String instanceName)
-  {
-    // System.out.println("ZKClusterManager.addHealthStateChangeListener()");
-    // TODO: re-form this for stats checking
-    logger.info("ClusterManager.addHealthStateChangeListener()" + instanceName);
-    checkConnected();
-    final String path = _helixAccessor.keyBuilder().healthReports(instanceName).getPath();
-
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path, listener, new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDataChanged,
-            EventType.NodeDeleted, EventType.NodeCreated }, HEALTH);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public void addExternalViewChangeListener(ExternalViewChangeListener listener)
-  {
-    logger.info("ClusterManager.addExternalViewChangeListener()");
-    checkConnected();
-    final String path = _helixAccessor.keyBuilder().externalViews().getPath();
-
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeDataChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              EXTERNAL_VIEW);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public DataAccessor getDataAccessor()
-  {
-    checkConnected();
-    return _accessor;
-  }
-
-  @Override
-  public HelixDataAccessor getHelixDataAccessor()
-  {
-    checkConnected();
-    return _helixAccessor;
-  }
-
-  @Override
-  public ConfigAccessor getConfigAccessor()
-  {
-    checkConnected();
-    return _configAccessor;
-  }
-
-  @Override
-  public String getClusterName()
-  {
-    return _clusterName;
-  }
-
-  @Override
-  public String getInstanceName()
-  {
-    return _instanceName;
-  }
-
-  @Override
-  public void connect() throws Exception
-  {
-    logger.info("ClusterManager.connect()");
-    if (_zkStateChangeListener.isConnected())
-    {
-      logger.warn("Cluster manager " + _clusterName + " " + _instanceName
-          + " already connected");
-      return;
-    }
-
-    try
-    {
-      createClient(_zkConnectString);
-      _messagingService.onConnected();
-    }
-    catch (Exception e)
-    {
-      logger.error(e);
-      disconnect();
-      throw e;
-    }
-  }
-
-  @Override
-  public void disconnect()
-  {
-
-    if (!isConnected())
-    {
-      logger.warn("ClusterManager " + _instanceName + " already disconnected");
-      return;
-    }
-
-    logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
-        + _clusterName);
-
-    /**
-     * shutdown thread pool first to avoid reset() being invoked in the middle of state
-     * transition
-     */
-    _messagingService.getExecutor().shutDown();
-    resetHandlers();
-
-    _helixAccessor.shutdown();
-
-    if (_leaderElectionHandler != null)
-    {
-      _leaderElectionHandler.reset();
-    }
-
-    if (_participantHealthCheckInfoCollector != null)
-    {
-      _participantHealthCheckInfoCollector.stop();
-    }
-
-    if (_timer != null)
-    {
-      _timer.cancel();
-      _timer = null;
-    }
-
-    if (_instanceType == InstanceType.CONTROLLER)
-    {
-      stopTimerTasks();
-    }
-
-    if (_propertyStore != null)
-    {
-      _propertyStore.stop();
-    }
-
-    // unsubscribe accessor from controllerChange
-    _zkClient.unsubscribeAll();
-
-    _zkClient.close();
-
-    // HACK seems that zkClient is not sending DISCONNECT event
-    _zkStateChangeListener.disconnect();
-    logger.info("Cluster manager: " + _instanceName + " disconnected");
-  }
-
-  @Override
-  public String getSessionId()
-  {
-    checkConnected();
-    return _sessionId;
-  }
-
-  @Override
-  public boolean isConnected()
-  {
-    return _zkStateChangeListener.isConnected();
-  }
-
-  @Override
-  public long getLastNotificationTime()
-  {
-    return -1;
-  }
-
-  @Override
-  public void addControllerListener(ControllerChangeListener listener)
-  {
-    checkConnected();
-    final String path = _helixAccessor.keyBuilder().controller().getPath();
-    logger.info("Add controller listener at: " + path);
-    CallbackHandler callbackHandler =
-        createCallBackHandler(path,
-                              listener,
-                              new EventType[] { EventType.NodeChildrenChanged,
-                                  EventType.NodeDeleted, EventType.NodeCreated },
-                              ChangeType.CONTROLLER);
-
-    // System.out.println("add controller listeners to " + _instanceName +
-    // " for " + _clusterName);
-    // _handlers.add(callbackHandler);
-    addListener(callbackHandler);
-  }
-
-  @Override
-  public boolean removeListener(Object listener)
-  {
-    logger.info("remove listener: " + listener + " from " + _instanceName);
-
-    synchronized (this)
-    {
-      Iterator<CallbackHandler> iterator = _handlers.iterator();
-      while (iterator.hasNext())
-      {
-        CallbackHandler handler = iterator.next();
-        // simply compare reference
-        if (handler.getListener().equals(listener))
-        {
-          handler.reset();
-          iterator.remove();
-        }
-      }
-    }
-
-    return true;
-  }
-
-  private void addLiveInstance()
-  {
-    LiveInstance liveInstance = new LiveInstance(_instanceName);
-    liveInstance.setSessionId(_sessionId);
-    liveInstance.setHelixVersion(_version);
-    liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-
-    logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
-        + _sessionId);
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-    if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName),
-                                       liveInstance))
-    {
-      String errorMsg =
-          "Fail to create live instance node after waiting, so quit. instance:"
-              + _instanceName;
-      logger.warn(errorMsg);
-      throw new HelixException(errorMsg);
-
-    }
-    String currentStatePathParent =
-        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
-                                   _clusterName,
-                                   _instanceName,
-                                   getSessionId());
-
-    if (!_zkClient.exists(currentStatePathParent))
-    {
-      _zkClient.createPersistent(currentStatePathParent);
-      logger.info("Creating current state path " + currentStatePathParent);
-    }
-  }
-
-  private void startStatusUpdatedumpTask()
-  {
-    long initialDelay = 30 * 60 * 1000;
-    long period = 120 * 60 * 1000;
-    int timeThresholdNoChange = 180 * 60 * 1000;
-
-    if (_timer == null)
-    {
-      _timer = new Timer(true);
-      _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this,
-                                                        _zkClient,
-                                                        timeThresholdNoChange),
-                                 initialDelay,
-                                 period);
-    }
-  }
-
-  private void createClient(String zkServers) throws Exception
-  {
-    String propertyStorePath =
-        PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
-    // by default use ZNRecordStreamingSerializer except for paths within the property
-    // store which expects raw byte[] serialization/deserialization
-    PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
-                               .serialize(propertyStorePath, new ByteArraySerializer())
-                               .build();
-
-    _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
-    _accessor = new ZKDataAccessor(_clusterName, _zkClient);
-
-    ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-    if (_instanceType == InstanceType.PARTICIPANT)
-    {
-      String curStatePath =
-          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
-                                     _clusterName,
-                                     _instanceName);
-      _baseDataAccessor =
-          new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
-                                                Arrays.asList(curStatePath));
-    }
-    else if (_instanceType == InstanceType.CONTROLLER)
-    {
-      String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
-
-      _clusterName);
-      _baseDataAccessor =
-          new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
-                                                Arrays.asList(extViewPath));
-
-    }
-    else
-    {
-      _baseDataAccessor = baseDataAccessor;
-    }
-
-    _helixAccessor =
-        new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
-    _configAccessor = new ConfigAccessor(_zkClient);
-    int retryCount = 0;
-
-    _zkClient.subscribeStateChanges(_zkStateChangeListener);
-    while (retryCount < RETRY_LIMIT)
-    {
-      try
-      {
-        _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
-        _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
-        _zkStateChangeListener.handleNewSession();
-        break;
-      }
-      catch (HelixException e)
-      {
-        logger.error("fail to createClient.", e);
-        throw e;
-      }
-      catch (Exception e)
-      {
-        retryCount++;
-
-        logger.error("fail to createClient. retry " + retryCount, e);
-        if (retryCount == RETRY_LIMIT)
-        {
-          throw e;
-        }
-      }
-    }
-  }
-
-  private CallbackHandler createCallBackHandler(String path,
-                                                Object listener,
-                                                EventType[] eventTypes,
-                                                ChangeType changeType)
-  {
-    if (listener == null)
-    {
-      throw new HelixException("Listener cannot be null");
-    }
-    return new CallbackHandler(this, _zkClient, path, listener, eventTypes, changeType);
-  }
-
-  /**
-   * This will be invoked when ever a new session is created<br/>
-   * 
-   * case 1: the cluster manager was a participant carry over current state, add live
-   * instance, and invoke message listener; case 2: the cluster manager was controller and
-   * was a leader before do leader election, and if it becomes leader again, invoke ideal
-   * state listener, current state listener, etc. if it fails to become leader in the new
-   * session, then becomes standby; case 3: the cluster manager was controller and was NOT
-   * a leader before do leader election, and if it becomes leader, instantiate and invoke
-   * ideal state listener, current state listener, etc. if if fails to become leader in
-   * the new session, stay as standby
-   */
-
-  protected void handleNewSession()
-  {
-    boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
-    while (!isConnected)
-    {
-      logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
-          + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
-          + _clusterName);
-      isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
-    }
-
-    ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
-    
-    synchronized (this)
-    {
-      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
-    }
-    _accessor.reset();
-    _baseDataAccessor.reset();
-
-    resetHandlers();
-
-    logger.info("Handling new session, session id:" + _sessionId + ", instance:"
-        + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
-
-    logger.info(zkConnection.getZookeeper());
-
-    if (!ZKUtil.isClusterSetup(_clusterName, _zkClient))
-    {
-      throw new HelixException("Initial cluster structure is not set up for cluster:"
-          + _clusterName);
-    }
-    if (!isInstanceSetup())
-    {
-      throw new HelixException("Initial cluster structure is not set up for instance:"
-          + _instanceName + " instanceType:" + _instanceType);
-    }
-
-    if (_instanceType == InstanceType.PARTICIPANT
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
-    {
-      handleNewSessionAsParticipant();
-    }
-
-    if (_instanceType == InstanceType.CONTROLLER
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
-    {
-      addControllerMessageListener(_messagingService.getExecutor());
-      MessageHandlerFactory defaultControllerMsgHandlerFactory =
-          new DefaultControllerMessageHandlerFactory();
-      _messagingService.getExecutor()
-                       .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
-                                                      defaultControllerMsgHandlerFactory);
-      MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
-          new DefaultSchedulerMessageHandlerFactory(this);
-      _messagingService.getExecutor()
-                       .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
-                                                      defaultSchedulerMsgHandlerFactory);
-      MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
-          new DefaultParticipantErrorMessageHandlerFactory(this);
-      _messagingService.getExecutor()
-                       .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
-                                                      defaultParticipantErrorMessageHandlerFactory);
-
-      if (_leaderElectionHandler == null)
-      {
-        final String path =
-            PropertyPathConfig.getPath(PropertyType.CONTROLLER, _clusterName);
-
-        _leaderElectionHandler =
-            createCallBackHandler(path,
-                                  new DistClusterControllerElection(_zkConnectString),
-                                  new EventType[] { EventType.NodeChildrenChanged,
-                                      EventType.NodeDeleted, EventType.NodeCreated },
-                                  ChangeType.CONTROLLER);
-      }
-      else
-      {
-        _leaderElectionHandler.init();
-      }
-    }
-
-    if (_instanceType == InstanceType.PARTICIPANT
-        || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
-        || (_instanceType == InstanceType.CONTROLLER && isLeader()))
-    {
-      initHandlers();
-    }
-  }
-
-  private void handleNewSessionAsParticipant()
-  {
-    // In case there is a live instance record on zookeeper
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-
-    if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
-    {
-      logger.warn("Found another instance with same instanceName: " + _instanceName
-          + " in cluster " + _clusterName);
-      // Wait for a while, in case previous storage node exits unexpectedly
-      // and its liveinstance
-      // still hangs around until session timeout happens
-      try
-      {
-        Thread.sleep(_sessionTimeout + 5000);
-      }
-      catch (InterruptedException e)
-      {
-        logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.",
-                    e);
-      }
-
-      if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
-      {
-        String errorMessage =
-            "instance " + _instanceName + " already has a liveinstance in cluster "
-                + _clusterName;
-        logger.error(errorMessage);
-        throw new HelixException(errorMessage);
-      }
-    }
-    // Invoke the PreConnectCallbacks
-    for (PreConnectCallback callback : _preConnectCallbacks)
-    {
-      callback.onPreConnect();
-    }
-    addLiveInstance();
-    carryOverPreviousCurrentState();
-
-    // In case the cluster manager is running as a participant, setup message
-    // listener
-    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
-                                                    _stateMachEngine);
-    addMessageListener(_messagingService.getExecutor(), _instanceName);
-    addControllerListener(_helixAccessor);
-
-    if (_participantHealthCheckInfoCollector == null)
-    {
-      _participantHealthCheckInfoCollector =
-          new ParticipantHealthReportCollectorImpl(this, _instanceName);
-      _participantHealthCheckInfoCollector.start();
-    }
-    // start the participant health check timer, also create zk path for health
-    // check info
-    String healthCheckInfoPath =
-        _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
-    if (!_zkClient.exists(healthCheckInfoPath))
-    {
-      _zkClient.createPersistent(healthCheckInfoPath, true);
-      logger.info("Creating healthcheck info path " + healthCheckInfoPath);
-    }
-  }
-
-  @Override
-  public void addPreConnectCallback(PreConnectCallback callback)
-  {
-    logger.info("Adding preconnect callback");
-    _preConnectCallbacks.add(callback);
-  }
-
-  private void resetHandlers()
-  {
-    synchronized (this)
-    {
-      // get a copy of the list and iterate over the copy list
-      // in case handler.reset() will modify the original handler list
-      List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
-      handlers.addAll(_handlers);
-
-      for (CallbackHandler handler : handlers)
-      {
-        handler.reset();
-        logger.info("reset handler: " + handler.getPath() + " by "
-            + Thread.currentThread().getName());
-      }
-    }
-  }
-
-  private void initHandlers()
-  {
-    // may add new currentState and message listeners during init()
-    // so make a copy and iterate over the copy
-    synchronized (this)
-    {
-      List<CallbackHandler> handlers = new ArrayList<CallbackHandler>();
-      handlers.addAll(_handlers);
-      for (CallbackHandler handler : handlers)
-      {
-        handler.init();
-      }
-    }
-  }
-
-  private void addListener(CallbackHandler handler)
-  {
-    synchronized (this)
-    {
-      _handlers.add(handler);
-      logger.info("add handler: " + handler.getPath() + " by "
-          + Thread.currentThread().getName());
-    }
-  }
-
-  @Override
-  public boolean isLeader()
-  {
-    if (!isConnected())
-    {
-      return false;
-    }
-
-    if (_instanceType != InstanceType.CONTROLLER)
-    {
-      return false;
-    }
-
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-    LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null)
-    {
-      return false;
-    }
-    else
-    {
-      String leaderName = leader.getInstanceName();
-      // TODO need check sessionId also, but in distributed mode, leader's
-      // sessionId is
-      // not equal to
-      // the leader znode's sessionId field which is the sessionId of the
-      // controller_participant that
-      // successfully creates the leader node
-      if (leaderName == null || !leaderName.equals(_instanceName))
-      {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private void carryOverPreviousCurrentState()
-  {
-    Builder keyBuilder = _helixAccessor.keyBuilder();
-
-    List<String> subPaths =
-        _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
-    for (String previousSessionId : subPaths)
-    {
-      List<CurrentState> previousCurrentStates =
-          _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName,
-                                                                 previousSessionId));
-
-      for (CurrentState previousCurrentState : previousCurrentStates)
-      {
-        if (!previousSessionId.equalsIgnoreCase(_sessionId))
-        {
-          logger.info("Carrying over old session:" + previousSessionId + " resource "
-              + previousCurrentState.getId() + " to new session:" + _sessionId);
-          String stateModelDefRef = previousCurrentState.getStateModelDefRef();
-          if (stateModelDefRef == null)
-          {
-            logger.error("pervious current state doesn't have a state model def. skip it. prevCS: "
-                + previousCurrentState);
-            continue;
-          }
-          StateModelDefinition stateModel =
-              _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
-          for (String partitionName : previousCurrentState.getPartitionStateMap()
-                                                          .keySet())
-          {
-
-            previousCurrentState.setState(partitionName, stateModel.getInitialState());
-          }
-          previousCurrentState.setSessionId(_sessionId);
-          _helixAccessor.setProperty(keyBuilder.currentState(_instanceName,
-                                                             _sessionId,
-                                                             previousCurrentState.getId()),
-                                     previousCurrentState);
-        }
-      }
-    }
-    // Deleted old current state
-    for (String previousSessionId : subPaths)
-    {
-      if (!previousSessionId.equalsIgnoreCase(_sessionId))
-      {
-        String path =
-            _helixAccessor.keyBuilder()
-                          .currentStates(_instanceName, previousSessionId)
-                          .getPath();
-        logger.info("Deleting previous current state. path: " + path + "/"
-            + previousSessionId);
-        _zkClient.deleteRecursive(path);
-
-      }
-    }
-  }
-
-  @Deprecated
-  @Override
-  public synchronized PropertyStore<ZNRecord> getPropertyStore()
-  {
-    checkConnected();
-
-    if (_propertyStore == null)
-    {
-      String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
-      // reuse the existing zkClient because its serializer will use raw serialization
-      // for paths of the property store.
-      _propertyStore =
-          new ZKPropertyStore<ZNRecord>(_zkClient, new ZNRecordJsonSerializer(), path);
-    }
-
-    return _propertyStore;
-  }
-
-  @Override
-  public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
-  {
-    checkConnected();
-
-    if (_helixPropertyStore == null)
-    {
-      String path =
-          PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName);
-
-      _helixPropertyStore =
-          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
-                                             path,
-                                             null);
-    }
-
-    return _helixPropertyStore;
-  }
-
-  @Override
-  public synchronized HelixAdmin getClusterManagmentTool()
-  {
-    checkConnected();
-    if (_zkClient != null)
-    {
-      _managementTool = new ZKHelixAdmin(_zkClient);
-    }
-    else
-    {
-      logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
-    }
-
-    return _managementTool;
-  }
-
-  @Override
-  public ClusterMessagingService getMessagingService()
-  {
-    // The caller can register message handler factories on messaging service before the
-    // helix manager is connected. Thus we do not do connected check here.
-    return _messagingService;
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector()
-  {
-    checkConnected();
-    return _participantHealthCheckInfoCollector;
-  }
-
-  @Override
-  public InstanceType getInstanceType()
-  {
-    return _instanceType;
-  }
-
-  private void checkConnected()
-  {
-    if (!isConnected())
-    {
-      throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
-    }
-  }
-
-  @Override
-  public String getVersion()
-  {
-    return _version;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine()
-  {
-    return _stateMachEngine;
-  }
-
-  protected List<CallbackHandler> getHandlers()
-  {
-    return _handlers;
-  }
-
-  // TODO: rename this and not expose this function as part of interface
-  @Override
-  public void startTimerTasks()
-  {
-    for (HelixTimerTask task : _controllerTimerTasks)
-    {
-      task.start();
-    }
-    startStatusUpdatedumpTask();
-  }
-
-  @Override
-  public void stopTimerTasks()
-  {
-    for (HelixTimerTask task : _controllerTimerTasks)
-    {
-      task.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java
deleted file mode 100644
index 14040c6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKUtil.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-
-public final class ZKUtil
-{
-  private static Logger logger = Logger.getLogger(ZKUtil.class);
-  private static int RETRYLIMIT = 3;
-
-  private ZKUtil()
-  {
-  }
-
-  public static boolean isClusterSetup(String clusterName, ZkClient zkClient)
-  {
-    if (clusterName == null || zkClient == null)
-    {
-      return false;
-    }
-
-    boolean isValid =
-        zkClient.exists(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                                          clusterName,
-                                                          ConfigScopeProperty.CLUSTER.toString(),
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                                          clusterName,
-                                                          ConfigScopeProperty.PARTICIPANT.toString()))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                                          clusterName,
-                                                          ConfigScopeProperty.RESOURCE.toString()))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.CONTROLLER,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER,
-                                                          clusterName))
-            && zkClient.exists(PropertyPathConfig.getPath(PropertyType.HISTORY,
-                                                          clusterName));
-
-    return isValid;
-  }
-
-  public static void createChildren(ZkClient client,
-                                    String parentPath,
-                                    List<ZNRecord> list)
-  {
-    client.createPersistent(parentPath, true);
-    if (list != null)
-    {
-      for (ZNRecord record : list)
-      {
-        createChildren(client, parentPath, record);
-      }
-    }
-  }
-
-  public static void createChildren(ZkClient client,
-                                    String parentPath,
-                                    ZNRecord nodeRecord)
-  {
-    client.createPersistent(parentPath, true);
-
-    String id = nodeRecord.getId();
-    String temp = parentPath + "/" + id;
-    client.createPersistent(temp, nodeRecord);
-  }
-
-  public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list)
-  {
-    // TODO: check if parentPath exists
-    if (list != null)
-    {
-      for (ZNRecord record : list)
-      {
-        dropChildren(client, parentPath, record);
-      }
-    }
-  }
-
-  public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord)
-  {
-    // TODO: check if parentPath exists
-    String id = nodeRecord.getId();
-    String temp = parentPath + "/" + id;
-    client.deleteRecursive(temp);
-  }
-
-  public static List<ZNRecord> getChildren(ZkClient client, String path)
-  {
-    // parent watch will be set by zkClient
-    List<String> children = client.getChildren(path);
-    if (children == null || children.size() == 0)
-    {
-      return Collections.emptyList();
-    }
-
-    List<ZNRecord> childRecords = new ArrayList<ZNRecord>();
-    for (String child : children)
-    {
-      String childPath = path + "/" + child;
-      Stat newStat = new Stat();
-      ZNRecord record = client.readDataAndStat(childPath, newStat, true);
-      if (record != null)
-      {
-        record.setVersion(newStat.getVersion());
-        record.setCreationTime(newStat.getCtime());
-        record.setModifiedTime(newStat.getMtime());
-        childRecords.add(record);
-      }
-    }
-    return childRecords;
-  }
-
-  public static void updateIfExists(ZkClient client,
-                                    String path,
-                                    final ZNRecord record,
-                                    boolean mergeOnUpdate)
-  {
-    if (client.exists(path))
-    {
-      DataUpdater<Object> updater = new DataUpdater<Object>()
-      {
-        @Override
-        public Object update(Object currentData)
-        {
-          return record;
-        }
-      };
-      client.updateDataSerialized(path, updater);
-    }
-  }
-
-  public static void createOrUpdate(ZkClient client,
-                                    String path,
-                                    final ZNRecord record,
-                                    final boolean persistent,
-                                    final boolean mergeOnUpdate)
-  {
-    int retryCount = 0;
-    while (retryCount < RETRYLIMIT)
-    {
-      try
-      {
-        if (client.exists(path))
-        {
-          DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
-          {
-            @Override
-            public ZNRecord update(ZNRecord currentData)
-            {
-              if (currentData != null && mergeOnUpdate)
-              {
-                currentData.merge(record);
-                return currentData;
-              }
-              return record;
-            }
-          };
-          client.updateDataSerialized(path, updater);
-        }
-        else
-        {
-          CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
-          if (record.getDeltaList().size() > 0)
-          {
-            ZNRecord value = new ZNRecord(record.getId());
-            value.merge(record);
-            client.create(path, value, mode);
-          }
-          else
-          {
-            client.create(path, record, mode);
-          }
-        }
-        break;
-      }
-      catch (Exception e)
-      {
-        retryCount = retryCount + 1;
-        logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
-            + ". Will retry.");
-      }
-    }
-  }
-
-  public static void asyncCreateOrUpdate(ZkClient client,
-                                         String path,
-                                         final ZNRecord record,
-                                         final boolean persistent,
-                                         final boolean mergeOnUpdate)
-  {
-    try
-    {
-      if (client.exists(path))
-      {
-        if (mergeOnUpdate)
-        {
-          ZNRecord curRecord = client.readData(path);
-          if (curRecord != null)
-          {
-            curRecord.merge(record);
-            client.asyncSetData(path, curRecord, -1, null);
-          }
-          else
-          {
-            client.asyncSetData(path, record, -1, null);
-          }
-        }
-        else
-        {
-          client.asyncSetData(path, record, -1, null);
-        }
-      }
-      else
-      {
-        CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
-        if (record.getDeltaList().size() > 0)
-        {
-          ZNRecord newRecord = new ZNRecord(record.getId());
-          newRecord.merge(record);
-          client.create(path, null, mode);
-
-          client.asyncSetData(path, newRecord, -1, null);
-        }
-        else
-        {
-          client.create(path, null, mode);
-
-          client.asyncSetData(path, record, -1, null);
-        }
-      }
-    }
-    catch (Exception e)
-    {
-      logger.error("Exception in async create or update " + path + ". Exception: "
-          + e.getMessage() + ". Give up.");
-    }
-  }
-
-  public static void createOrReplace(ZkClient client,
-                                     String path,
-                                     final ZNRecord record,
-                                     final boolean persistent)
-  {
-    int retryCount = 0;
-    while (retryCount < RETRYLIMIT)
-    {
-      try
-      {
-        if (client.exists(path))
-        {
-          DataUpdater<Object> updater = new DataUpdater<Object>()
-          {
-            @Override
-            public Object update(Object currentData)
-            {
-              return record;
-            }
-          };
-          client.updateDataSerialized(path, updater);
-        }
-        else
-        {
-          CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
-          client.create(path, record, mode);
-        }
-        break;
-      }
-      catch (Exception e)
-      {
-        retryCount = retryCount + 1;
-        logger.warn("Exception trying to createOrReplace " + path + " Exception:"
-            + e.getMessage() + ". Will retry.");
-      }
-    }
-  }
-
-  public static void subtract(ZkClient client,
-                              String path,
-                              final ZNRecord recordTosubtract)
-  {
-    int retryCount = 0;
-    while (retryCount < RETRYLIMIT)
-    {
-      try
-      {
-        if (client.exists(path))
-        {
-          DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
-          {
-            @Override
-            public ZNRecord update(ZNRecord currentData)
-            {
-              currentData.subtract(recordTosubtract);
-              return currentData;
-            }
-          };
-          client.updateDataSerialized(path, updater);
-          break;
-        }
-      }
-      catch (Exception e)
-      {
-        retryCount = retryCount + 1;
-        logger.warn("Exception trying to createOrReplace " + path + " Exception:"
-            + e.getMessage() + ". Will retry.");
-        e.printStackTrace();
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java
deleted file mode 100644
index 7575d40..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordSerializer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.ZNRecord;
-
-public class ZNRecordSerializer implements ZkSerializer
-{
-  private static Logger logger = Logger.getLogger(ZNRecordSerializer.class);
-
-  private static int getListFieldBound(ZNRecord record)
-  {
-    int max = Integer.MAX_VALUE;
-    if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
-    {
-      String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
-      try
-      {
-        max = Integer.parseInt(maxStr);
-      }
-      catch (Exception e)
-      {
-        logger.error("IllegalNumberFormat for list field bound: " + maxStr);
-      }
-    }
-    return max;
-  }
-
-  @Override
-  public byte[] serialize(Object data)
-  {
-    if (!(data instanceof ZNRecord))
-    {
-      // null is NOT an instance of any class
-      logger.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
-      throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
-    }
-
-    ZNRecord record = (ZNRecord) data;
-    
-    // apply retention policy
-    int max = getListFieldBound(record);
-    if (max < Integer.MAX_VALUE)
-    {
-      Map<String, List<String>> listMap = record.getListFields();
-      for (String key : listMap.keySet())
-      {
-        List<String> list = listMap.get(key);
-        if (list.size() > max)
-        {
-          listMap.put(key, list.subList(0, max));
-        }
-      }
-    }
-
-    // do serialization
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    StringWriter sw = new StringWriter();
-    try
-    {
-      mapper.writeValue(sw, data);
-    } catch (Exception e)
-    {
-      logger.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
-          + sw.toString().substring(0, 1024), e);
-      throw new HelixException(e);
-    }
-    
-    if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
-    {
-      logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId() 
-          + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
-      throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
-    }
-    return sw.toString().getBytes();
-  }
-
-  @Override
-  public Object deserialize(byte[] bytes)
-  {
-    if (bytes == null || bytes.length == 0)
-    {
-      // reading a parent/null node
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-
-    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
-    try
-    {
-      ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
-      return zn;
-    } catch (Exception e)
-    {
-      logger.error("Exception during deserialization of bytes: " + new String(bytes), e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java
deleted file mode 100644
index 32ede77..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ /dev/null
@@ -1,300 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.ZNRecord;
-
-public class ZNRecordStreamingSerializer implements ZkSerializer
-{
-  private static Logger LOG = Logger.getLogger(ZNRecordStreamingSerializer.class);
-
-  private static int getListFieldBound(ZNRecord record)
-  {
-    int max = Integer.MAX_VALUE;
-    if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
-    {
-      String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
-      try
-      {
-        max = Integer.parseInt(maxStr);
-      }
-      catch (Exception e)
-      {
-        LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
-      }
-    }
-    return max;
-  }
-  
-  @Override
-  public byte[] serialize(Object data) throws ZkMarshallingError
-  {
-    if (!(data instanceof ZNRecord))
-    {
-      // null is NOT an instance of any class
-      LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
-      throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
-    }
-
-    // apply retention policy on list field
-    ZNRecord record = (ZNRecord) data;
-    int max = getListFieldBound(record);
-    if (max < Integer.MAX_VALUE)
-    {
-      Map<String, List<String>> listMap = record.getListFields();
-      for (String key : listMap.keySet())
-      {
-        List<String> list = listMap.get(key);
-        if (list.size() > max)
-        {
-          listMap.put(key, list.subList(0, max));
-        }
-      }
-    }
-    
-    StringWriter sw = new StringWriter();
-    try
-    {
-      JsonFactory f = new JsonFactory();
-      JsonGenerator g = f.createJsonGenerator(sw);
-
-      g.writeStartObject();
-
-      // write id field
-      g.writeRaw("\n  ");
-      g.writeStringField("id", record.getId());
-
-      // write simepleFields
-      g.writeRaw("\n  ");
-      g.writeObjectFieldStart("simpleFields");
-      for (String key : record.getSimpleFields().keySet())
-      {
-        g.writeRaw("\n    ");
-        g.writeStringField(key, record.getSimpleField(key));
-      }
-      g.writeRaw("\n  ");
-      g.writeEndObject(); // for simpleFields
-
-      // write listFields
-      g.writeRaw("\n  ");
-      g.writeObjectFieldStart("listFields");
-      for (String key : record.getListFields().keySet())
-      {
-        // g.writeStringField(key, record.getListField(key).toString());
-
-        // g.writeObjectFieldStart(key);
-        g.writeRaw("\n    ");
-        g.writeArrayFieldStart(key);
-        List<String> list = record.getListField(key);
-        for (String listValue : list)
-        {
-          g.writeString(listValue);
-        }
-        // g.writeEndObject();
-        g.writeEndArray();
-
-      }
-      g.writeRaw("\n  ");
-      g.writeEndObject(); // for listFields
-
-      // write mapFields
-      g.writeRaw("\n  ");
-      g.writeObjectFieldStart("mapFields");
-      for (String key : record.getMapFields().keySet())
-      {
-        // g.writeStringField(key, record.getMapField(key).toString());
-        g.writeRaw("\n    ");
-        g.writeObjectFieldStart(key);
-        Map<String, String> map = record.getMapField(key);
-        for (String mapKey : map.keySet())
-        {
-          g.writeRaw("\n      ");
-          g.writeStringField(mapKey, map.get(mapKey));
-        }
-        g.writeRaw("\n    ");
-        g.writeEndObject();
-
-      }
-      g.writeRaw("\n  ");
-      g.writeEndObject(); // for mapFields
-
-      g.writeRaw("\n");
-      g.writeEndObject(); // for whole znrecord
-
-      // important: will force flushing of output, close underlying output
-      // stream
-      g.close();
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
-          + sw.toString().substring(0, 1024), e);
-      throw new HelixException(e);
-    }
-    
-    // check size
-    if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
-    {
-      LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId() 
-          + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
-      throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
-    }
-    
-    return sw.toString().getBytes();
-  }
-
-  @Override
-  public Object deserialize(byte[] bytes) throws ZkMarshallingError
-  {
-    if (bytes == null || bytes.length == 0)
-    {
-      LOG.error("ZNode is empty.");
-      return null;
-    }
-    
-    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-    ZNRecord record = null;
-
-    try
-    {
-      JsonFactory f = new JsonFactory();
-      JsonParser jp = f.createJsonParser(bais);
-
-      jp.nextToken(); // will return JsonToken.START_OBJECT (verify?)
-      while (jp.nextToken() != JsonToken.END_OBJECT)
-      {
-        String fieldname = jp.getCurrentName();
-        jp.nextToken(); // move to value, or START_OBJECT/START_ARRAY
-        if ("id".equals(fieldname))
-        { 
-          // contains an object
-          record = new ZNRecord(jp.getText());
-        }
-        else if ("simpleFields".equals(fieldname))
-        {
-          while (jp.nextToken() != JsonToken.END_OBJECT)
-          {
-            String key = jp.getCurrentName();
-            jp.nextToken(); // move to value
-            record.setSimpleField(key, jp.getText());
-          }
-        }
-        else if ("mapFields".equals(fieldname))
-        {
-          // user.setVerified(jp.getCurrentToken() == JsonToken.VALUE_TRUE);
-          while (jp.nextToken() != JsonToken.END_OBJECT)
-          {
-            String key = jp.getCurrentName();
-            record.setMapField(key, new TreeMap<String, String>());
-            jp.nextToken(); // move to value
-
-            while (jp.nextToken() != JsonToken.END_OBJECT)
-            {
-              String mapKey = jp.getCurrentName();
-              jp.nextToken(); // move to value
-              record.getMapField(key).put(mapKey, jp.getText());
-            }
-          }
-
-        }
-        else if ("listFields".equals(fieldname))
-        {
-          // user.setUserImage(jp.getBinaryValue());
-          while (jp.nextToken() != JsonToken.END_OBJECT)
-          {
-            String key = jp.getCurrentName();
-            record.setListField(key, new ArrayList<String>());
-            jp.nextToken(); // move to value
-            while (jp.nextToken() != JsonToken.END_ARRAY)
-            {
-              record.getListField(key).add(jp.getText());
-            }
-
-          }
-
-        }
-        else
-        {
-          throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
-        }
-      }
-      jp.close(); // ensure resources get cleaned up timely and properly
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception during deserialization of bytes: " + new String(bytes), e);
-    }
-
-    return record;
-  }
-
-  public static void main(String[] args)
-  {
-    ZNRecord record = new ZNRecord("record");
-    final int recordSize = 10;
-    for (int i = 0; i < recordSize; i++)
-    {
-      record.setSimpleField("" + i, "" + i);
-      record.setListField("" + i, new ArrayList<String>());
-      for (int j = 0; j < recordSize; j++)
-      {
-        record.getListField("" + i).add("" + j);
-      }
-
-      record.setMapField("" + i, new TreeMap<String, String>());
-      for (int j = 0; j < recordSize; j++)
-      {
-        record.getMapField("" + i).put("" + j, "" + j);
-      }
-    }
-
-    ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
-    byte[] bytes = serializer.serialize(record);
-    System.out.println(new String(bytes));
-    ZNRecord record2 = (ZNRecord) serializer.deserialize(bytes);
-    System.out.println(record2);
-
-    long start = System.currentTimeMillis();
-    for (int i = 0; i < 100; i++)
-    {
-      bytes = serializer.serialize(record);
-      // System.out.println(new String(bytes));
-      record2 = (ZNRecord) serializer.deserialize(bytes);
-      // System.out.println(record2);
-    }
-    long end = System.currentTimeMillis();
-    System.out.println("ZNRecordStreamingSerializer time used: " + (end - start));
-
-    ZNRecordSerializer serializer2 = new ZNRecordSerializer();
-    bytes = serializer2.serialize(record);
-    // System.out.println(new String(bytes));
-    record2 = (ZNRecord) serializer2.deserialize(bytes);
-    // System.out.println(record2);
-
-    start = System.currentTimeMillis();
-    for (int i = 0; i < 100; i++)
-    {
-      bytes = serializer2.serialize(record);
-      // System.out.println(new String(bytes));
-      record2 = (ZNRecord) serializer2.deserialize(bytes);
-      // System.out.println(record2);
-    }
-    end = System.currentTimeMillis();
-    System.out.println("ZNRecordSerializer time used: " + (end - start));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java
deleted file mode 100644
index 9d8d179..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZkAsyncCallbacks.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.data.Stat;
-
-public class ZkAsyncCallbacks
-{
-  private static Logger LOG = Logger.getLogger(ZkAsyncCallbacks.class);
-
-  static class GetDataCallbackHandler extends DefaultCallback implements DataCallback
-  {
-    byte[] _data;
-    Stat   _stat;
-
-    @Override
-    public void handle()
-    {
-      // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
-    {
-      if (rc == 0)
-      {
-        _data = data;
-        _stat = stat;
-      }
-      callback(rc, path, ctx);
-    }
-  }
-
-  static class SetDataCallbackHandler extends DefaultCallback implements StatCallback
-  {
-    Stat _stat;
-
-    @Override
-    public void handle()
-    {
-      // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void processResult(int rc, String path, Object ctx, Stat stat)
-    {
-      if (rc == 0)
-      {
-        _stat = stat;
-      }
-      callback(rc, path, ctx);
-    }
-    
-    public Stat getStat()
-    {
-      return _stat;
-    }
-  }
-  
-  static class ExistsCallbackHandler extends DefaultCallback implements StatCallback
-  {
-    Stat _stat;
-
-    @Override
-    public void handle()
-    {
-      // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void processResult(int rc, String path, Object ctx, Stat stat)
-    {
-      if (rc == 0)
-      {
-        _stat = stat;
-      }
-      callback(rc, path, ctx);
-    }
-
-  }
-
-  static class CreateCallbackHandler extends DefaultCallback implements StringCallback
-  {
-    @Override
-    public void processResult(int rc, String path, Object ctx, String name)
-    {
-      callback(rc, path, ctx);
-    }
-
-    @Override
-    public void handle()
-    {
-      // TODO Auto-generated method stub
-    }
-  }
-
-  static class DeleteCallbackHandler extends DefaultCallback implements VoidCallback
-  {
-    @Override
-    public void processResult(int rc, String path, Object ctx)
-    {
-      callback(rc, path, ctx);
-    }
-
-    @Override
-    public void handle()
-    {
-      // TODO Auto-generated method stub
-    }
-
-  }
-
-  /**
-   * Default callback for zookeeper async api
-   */
-  static abstract class DefaultCallback
-  {
-    AtomicBoolean _lock = new AtomicBoolean(false);
-    int           _rc   = -1;
-
-    public void callback(int rc, String path, Object ctx)
-    {
-      if (rc != 0)
-      {
-        LOG.warn(this + ", rc:" + Code.get(rc) + ", path: " + path);
-      }
-      _rc = rc;
-      handle();
-      
-      synchronized (_lock)
-      {
-        _lock.set(true);
-        _lock.notify();
-      }
-    }
-
-    public boolean waitForSuccess()
-    {
-      try
-      {
-        synchronized (_lock)
-        {
-          while (!_lock.get())
-          {
-            _lock.wait();
-          }
-        }
-      }
-      catch (InterruptedException e)
-      {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      return true;
-    }
-
-    public int getRc()
-    {
-      return _rc;
-    }
-
-    abstract public void handle();
-  }
-
-}