You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[15/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java
deleted file mode 100644
index 5cb6724..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import static com.linkedin.helix.HelixConstants.ChangeType.CONFIG;
-import static com.linkedin.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static com.linkedin.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-
-public class CallbackHandler implements IZkChildListener, IZkDataListener
-
-{
-
-  private static Logger logger = Logger.getLogger(CallbackHandler.class);
-
-  private final String _path;
-  private final Object _listener;
-  private final EventType[] _eventTypes;
-  private final HelixDataAccessor _accessor;
-  private final ChangeType _changeType;
-  private final ZkClient _zkClient;
-  private final AtomicLong lastNotificationTimeStamp;
-  private final HelixManager _manager;
-
-  public CallbackHandler(HelixManager manager, ZkClient client, String path,
-                         Object listener, EventType[] eventTypes, ChangeType changeType)
-  {
-    this._manager = manager;
-    this._accessor = manager.getHelixDataAccessor();
-    this._zkClient = client;
-    this._path = path;
-    this._listener = listener;
-    this._eventTypes = eventTypes;
-    this._changeType = changeType;
-    lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
-    init();
-  }
-
-  public Object getListener()
-  {
-    return _listener;
-  }
-
-  public String getPath()
-  {
-    return _path;
-  }
-
-  public void invoke(NotificationContext changeContext) throws Exception
-  {
-    // This allows the listener to work with one change at a time
-    synchronized (_manager)
-    {
-      Builder keyBuilder = _accessor.keyBuilder();
-      long start = System.currentTimeMillis();
-      if (logger.isInfoEnabled())
-      {
-        logger.info(Thread.currentThread().getId() + " START:INVOKE "
-        // + changeContext.getPathChanged()
-            + _path + " listener:" + _listener.getClass().getCanonicalName());
-      }
-
-      if (_changeType == IDEAL_STATE)
-      {
-
-        IdealStateChangeListener idealStateChangeListener =
-            (IdealStateChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
-
-        idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
-      }
-      else if (_changeType == CONFIG)
-      {
-
-        ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<InstanceConfig> configs =
-            _accessor.getChildValues(keyBuilder.instanceConfigs());
-
-        configChangeListener.onConfigChange(configs, changeContext);
-
-      }
-      else if (_changeType == LIVE_INSTANCE)
-      {
-        LiveInstanceChangeListener liveInstanceChangeListener =
-            (LiveInstanceChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<LiveInstance> liveInstances =
-            _accessor.getChildValues(keyBuilder.liveInstances());
-
-        liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
-
-      }
-      else if (_changeType == CURRENT_STATE)
-      {
-        CurrentStateChangeListener currentStateChangeListener;
-        currentStateChangeListener = (CurrentStateChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-        String[] pathParts = _path.split("/");
-
-        // TODO: fix this
-        List<CurrentState> currentStates =
-            _accessor.getChildValues(keyBuilder.currentStates(instanceName,
-                                                              pathParts[pathParts.length - 1]));
-
-        currentStateChangeListener.onStateChange(instanceName,
-                                                 currentStates,
-                                                 changeContext);
-
-      }
-      else if (_changeType == MESSAGE)
-      {
-        MessageListener messageListener = (MessageListener) _listener;
-        subscribeForChanges(changeContext, _path, true, false);
-        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-        List<Message> messages =
-            _accessor.getChildValues(keyBuilder.messages(instanceName));
-
-        messageListener.onMessage(instanceName, messages, changeContext);
-
-      }
-      else if (_changeType == MESSAGES_CONTROLLER)
-      {
-        MessageListener messageListener = (MessageListener) _listener;
-        subscribeForChanges(changeContext, _path, true, false);
-        List<Message> messages =
-            _accessor.getChildValues(keyBuilder.controllerMessages());
-
-        messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
-
-      }
-      else if (_changeType == EXTERNAL_VIEW)
-      {
-        ExternalViewChangeListener externalViewListener =
-            (ExternalViewChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true);
-        List<ExternalView> externalViewList =
-            _accessor.getChildValues(keyBuilder.externalViews());
-
-        externalViewListener.onExternalViewChange(externalViewList, changeContext);
-      }
-      else if (_changeType == ChangeType.CONTROLLER)
-      {
-        ControllerChangeListener controllerChangelistener =
-            (ControllerChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, false);
-        controllerChangelistener.onControllerChange(changeContext);
-      }
-      else if (_changeType == ChangeType.HEALTH)
-      {
-        HealthStateChangeListener healthStateChangeListener =
-            (HealthStateChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
-        // settings here
-        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-
-        List<HealthStat> healthReportList =
-            _accessor.getChildValues(keyBuilder.healthReports(instanceName));
-
-        healthStateChangeListener.onHealthChange(instanceName,
-                                                 healthReportList,
-                                                 changeContext);
-      }
-      long end = System.currentTimeMillis();
-      if (logger.isInfoEnabled())
-      {
-        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
-            + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
-            + (end - start));
-      }
-    }
-  }
-
-  private void subscribeForChanges(NotificationContext context,
-                                   String path,
-                                   boolean watchParent,
-                                   boolean watchChild)
-  {
-    NotificationContext.Type type = context.getType();
-    if (watchParent && type == NotificationContext.Type.INIT)
-    {
-      logger.info(_manager.getInstanceName() + " subscribe child change@" + path);
-      _zkClient.subscribeChildChanges(path, this);
-    }
-    else if (watchParent && type == NotificationContext.Type.FINALIZE)
-    {
-      logger.info(_manager.getInstanceName() + " UNsubscribe child change@" + path);
-      _zkClient.unsubscribeChildChanges(path, this);
-    }
-
-    if (watchChild)
-    {
-      try
-      {
-        List<String> childNames = _zkClient.getChildren(path);
-        if (childNames == null || childNames.size() == 0)
-        {
-          return;
-        }
-
-        for (String childName : childNames)
-        {
-          String childPath = path + "/" + childName;
-          if (type == NotificationContext.Type.INIT
-              || type == NotificationContext.Type.CALLBACK)
-          {
-            if (logger.isDebugEnabled())
-            {
-              logger.debug(_manager.getInstanceName() + " subscribe data change@" + path);
-            }
-            _zkClient.subscribeDataChanges(childPath, this);
-
-          }
-          else if (type == NotificationContext.Type.FINALIZE)
-          {
-            logger.info(_manager.getInstanceName() + " UNsubscribe data change@" + path);
-            _zkClient.unsubscribeDataChanges(childPath, this);
-          }
-
-          subscribeForChanges(context, childPath, watchParent, watchChild);
-        }
-      }
-      catch (ZkNoNodeException e)
-      {
-        logger.warn("fail to subscribe data change@" + path);
-      }
-    }
-
-  }
-
-  public EventType[] getEventTypes()
-  {
-    return _eventTypes;
-  }
-
-  /**
-   * Invoke the listener so that it sets up the initial values from the zookeeper if any
-   * exists
-   * 
-   */
-  public void init()
-  {
-    updateNotificationTime(System.nanoTime());
-    try
-    {
-      NotificationContext changeContext = new NotificationContext(_manager);
-      changeContext.setType(NotificationContext.Type.INIT);
-      invoke(changeContext);
-    }
-    catch (Exception e)
-    {
-      ZKExceptionHandler.getInstance().handle(e);
-    }
-  }
-
-  @Override
-  public void handleDataChange(String dataPath, Object data)
-  {
-    try
-    {
-      updateNotificationTime(System.nanoTime());
-      if (dataPath != null && dataPath.startsWith(_path))
-      {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        invoke(changeContext);
-      }
-    }
-    catch (Exception e)
-    {
-      ZKExceptionHandler.getInstance().handle(e);
-    }
-  }
-
-  @Override
-  public void handleDataDeleted(String dataPath)
-  {
-    try
-    {
-      updateNotificationTime(System.nanoTime());
-      if (dataPath != null && dataPath.startsWith(_path))
-      {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        _zkClient.unsubscribeChildChanges(dataPath, this);
-        invoke(changeContext);
-      }
-    }
-    catch (Exception e)
-    {
-      ZKExceptionHandler.getInstance().handle(e);
-    }
-  }
-
-  @Override
-  public void handleChildChange(String parentPath, List<String> currentChilds)
-  {
-    try
-    {
-      updateNotificationTime(System.nanoTime());
-      if (parentPath != null && parentPath.startsWith(_path))
-      {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        invoke(changeContext);
-      }
-    }
-    catch (Exception e)
-    {
-      ZKExceptionHandler.getInstance().handle(e);
-    }
-  }
-
-  /**
-   * Invoke the listener for the last time so that the listener could clean up resources
-   * 
-   */
-  public void reset()
-  {
-    try
-    {
-      NotificationContext changeContext = new NotificationContext(_manager);
-      changeContext.setType(NotificationContext.Type.FINALIZE);
-      invoke(changeContext);
-    }
-    catch (Exception e)
-    {
-      ZKExceptionHandler.getInstance().handle(e);
-    }
-  }
-
-  private void updateNotificationTime(long nanoTime)
-  {
-    long l = lastNotificationTimeStamp.get();
-    while (nanoTime > l)
-    {
-      boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
-      if (b)
-      {
-        break;
-      }
-      else
-      {
-        l = lastNotificationTimeStamp.get();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java
deleted file mode 100644
index 659eaec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-public class ChainedPathZkSerializer implements PathBasedZkSerializer
-{
-
-  public static class Builder
-  {
-    private final ZkSerializer _defaultSerializer;
-    private List<ChainItem> _items = new ArrayList<ChainItem>();
-
-    private Builder(ZkSerializer defaultSerializer)
-    {
-      _defaultSerializer = defaultSerializer;
-    }
-
-    /**
-     * Add a serializing strategy for the given path prefix
-     * The most specific path will triumph over a more generic (shorter)
-     * one regardless of the ordering of the calls.
-     */
-    public Builder serialize(String path, ZkSerializer withSerializer)
-    {
-      _items.add(new ChainItem(normalize(path), withSerializer));
-      return this;
-    }
-    
-    /**
-     * Builds the serializer with the given strategies and default serializer.
-     */
-    public ChainedPathZkSerializer build() {
-      return new ChainedPathZkSerializer(_defaultSerializer, _items);
-    }
-  }
-  
-  /**
-   * Create a builder that will use the given serializer by default
-   * if no other strategy is given to solve the path in question.
-   */
-  public static Builder builder(ZkSerializer defaultSerializer) 
-  {
-    return new Builder(defaultSerializer);
-  }
-
-  private final List<ChainItem> _items;
-  private final ZkSerializer _defaultSerializer;
-
-  private ChainedPathZkSerializer(ZkSerializer defaultSerializer, List<ChainItem> items)
-  {
-    _items = items;
-    // sort by longest paths first
-    // if two items would match one would be prefix of the other
-    // and the longest must be more specific
-    Collections.sort(_items);
-    _defaultSerializer = defaultSerializer;
-  }
-
-  @Override
-  public byte[] serialize(Object data, String path) throws ZkMarshallingError
-  {
-    for (ChainItem item : _items)
-    {
-      if (item.matches(path)) return item._serializer.serialize(data);
-    }
-    return _defaultSerializer.serialize(data);
-  }
-
-  @Override
-  public Object deserialize(byte[] bytes, String path)
-      throws ZkMarshallingError
-  {
-    for (ChainItem item : _items)
-    {
-      if (item.matches(path)) return item._serializer.deserialize(bytes);
-    }
-    return _defaultSerializer.deserialize(bytes);
-  }
-
-  private static class ChainItem implements Comparable<ChainItem>
-  {
-    final String _path;
-    final ZkSerializer _serializer;
-
-    ChainItem(String path, ZkSerializer serializer)
-    {
-      _path = path;
-      _serializer = serializer;
-    }
-
-    boolean matches(String path)
-    {
-      if (_path.equals(path))
-      {
-        return true;
-      } 
-      else if (path.length() > _path.length())
-      {
-        if (path.startsWith(_path) && path.charAt(_path.length()) == '/') 
-        {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public int compareTo(ChainItem o)
-    {
-      return o._path.length() - _path.length();
-    }
-  }
-  
-  private static String normalize(String path) {
-    if (!path.startsWith("/")) {
-      // ensure leading slash
-      path = "/" + path;
-    }
-    if (path.endsWith("/")) {
-      // remove trailing slash
-      path = path.substring(0, path.length()-1);
-    }
-    return path;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
deleted file mode 100644
index 4fc6b98..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-
-public class DefaultControllerMessageHandlerFactory implements
-    MessageHandlerFactory
-{
-  private static Logger _logger = Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
-  @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
-    String type = message.getMsgType();
-    
-    if(!type.equals(getMessageType()))
-    {
-      throw new HelixException("Unexpected msg type for message "+message.getMsgId()
-          +" type:" + message.getMsgType());
-    }
-    
-    return new DefaultControllerMessageHandler(message, context);
-  }
-
-  @Override
-  public String getMessageType()
-  {
-    return MessageType.CONTROLLER_MSG.toString();
-  }
-
-  @Override
-  public void reset()
-  {
-
-  }
-  
-  public static class DefaultControllerMessageHandler extends MessageHandler
-  {
-    public DefaultControllerMessageHandler(Message message,
-        NotificationContext context)
-    {
-      super(message, context);
-    }
-
-    @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
-      String type = _message.getMsgType();
-      HelixTaskResult result = new HelixTaskResult();
-      if(!type.equals(MessageType.CONTROLLER_MSG.toString()))
-      {
-        throw new HelixException("Unexpected msg type for message "+_message.getMsgId()
-            +" type:" + _message.getMsgType());
-      }
-      result.getTaskResultMap().put("ControllerResult", "msg "+ _message.getMsgId() + " from "+_message.getMsgSrc() + " processed");
-      result.setSuccess(true);
-      return result;
-    }
-
-    @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
-      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
deleted file mode 100644
index ec7c2f7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-
-import java.util.Arrays;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-
-/**
- * DefaultParticipantErrorMessageHandlerFactory works on controller side.
- * When the participant detects a critical error, it will send the PARTICIPANT_ERROR_REPORT
- * Message to the controller, specifying whether it want to disable the instance or
- * disable the partition. The controller have a chance to do whatever make sense at that point,
- * and then disable the corresponding partition or the instance. More configs per resource will
- * be added to customize the controller behavior.
- * */
-public class DefaultParticipantErrorMessageHandlerFactory implements
-  MessageHandlerFactory
-{
-  public enum ActionOnError
-  {
-    DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
-  }
-
-  public static final String ACTIONKEY = "ActionOnError";
-
-  private static Logger _logger = Logger
-    .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
-  final HelixManager _manager;
-
-  public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
-  {
-    _manager = manager;
-  }
-
-  public static class DefaultParticipantErrorMessageHandler extends MessageHandler
-  {
-    final HelixManager _manager;
-    public DefaultParticipantErrorMessageHandler(Message message,
-        NotificationContext context,  HelixManager manager)
-    {
-       super(message, context);
-       _manager = manager;
-    }
-
-    @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
-      HelixTaskResult result = new HelixTaskResult();
-      result.setSuccess(true);
-      // TODO : consider unify this with StatsAggregationStage.executeAlertActions()
-      try
-      {
-        ActionOnError actionOnError
-          = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
-
-        if(actionOnError == ActionOnError.DISABLE_INSTANCE)
-        {
-          _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
-          _logger.info("Instance " + _message.getMsgSrc() + " disabled");
-        }
-        else if(actionOnError == ActionOnError.DISABLE_PARTITION)
-        {
-          _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
-              _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
-          _logger.info("partition " + _message.getPartitionName() + " disabled");
-        }
-        else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
-        {
-          // NOT IMPLEMENTED, or we can disable all partitions
-          //_manager.getClusterManagmentTool().en(_manager.getClusterName(), _manager.getInstanceName(),
-          //    _message.getResourceName(), _message.getPartitionName(), false);
-          _logger.info("resource " + _message.getResourceName() + " disabled");
-        }
-      }
-      catch(Exception e)
-      {
-        _logger.error("", e);
-        result.setSuccess(false);
-        result.setException(e);
-      }
-      return result;
-    }
-
-    @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
-      _logger.error("Message handling pipeline get an exception. MsgId:"
-          + _message.getMsgId(), e);
-    }
-
-  }
-
-  @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
-    String type = message.getMsgType();
-
-    if(!type.equals(getMessageType()))
-    {
-      throw new HelixException("Unexpected msg type for message "+message.getMsgId()
-          +" type:" + message.getMsgType());
-    }
-
-    return new DefaultParticipantErrorMessageHandler(message, context, _manager);
-  }
-
-  @Override
-  public String getMessageType()
-  {
-    return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
-  }
-
-  @Override
-  public void reset()
-  {
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
deleted file mode 100644
index 0b19eca..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.StringReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.StatusUpdate;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-/*
- * TODO: The current implementation is temporary for backup handler testing only and it does not 
- * do any throttling. 
- * 
- */
-public class DefaultSchedulerMessageHandlerFactory implements
-    MessageHandlerFactory
-{
-  public static final String WAIT_ALL = "WAIT_ALL";
-  public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
-  public static class SchedulerAsyncCallback extends AsyncCallback
-  {
-    StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
-    Message _originalMessage;
-    HelixManager _manager;
-    final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap<String, Map<String, String>>();
-
-    public SchedulerAsyncCallback(Message originalMessage, HelixManager manager)
-    {
-      _originalMessage = originalMessage;
-      _manager = manager;
-    }
-
-    @Override
-    public void onTimeOut()
-    {
-      _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId()
-          + " timout with " + _timeout + " Ms");
-
-      _statusUpdateUtil.logError(_originalMessage,
-          SchedulerAsyncCallback.class, "Task timeout",
-          _manager.getHelixDataAccessor());
-      addSummary(_resultSummaryMap, _originalMessage, _manager, true);
-    }
-
-    @Override
-    public void onReplyMessage(Message message)
-    {
-      _logger.info("Update for scheduler msg " + _originalMessage.getMsgId()
-          + " Message " + message.getMsgSrc() + " id "
-          + message.getCorrelationId() + " completed");
-      String key = "MessageResult " + message.getMsgSrc() + " "
-          + UUID.randomUUID();
-      _resultSummaryMap.put(key, message.getResultMap());
-
-      if (this.isDone())
-      {
-        _logger.info("Scheduler msg " + _originalMessage.getMsgId()
-            + " completed");
-        _statusUpdateUtil.logInfo(_originalMessage,
-            SchedulerAsyncCallback.class, "Scheduler task completed",
-            _manager.getHelixDataAccessor());
-        addSummary(_resultSummaryMap, _originalMessage, _manager, false);
-      }
-    }
-
-    private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
-        Message originalMessage, HelixManager manager, boolean timeOut)
-    {
-      Map<String, String> summary = new TreeMap<String, String>();
-      summary.put("TotalMessages:", "" + _resultSummaryMap.size());
-      summary.put("Timeout", "" + timeOut);
-      _resultSummaryMap.put("Summary", summary);
-
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-      ZNRecord statusUpdate = accessor.getProperty(
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
-              originalMessage.getMsgId())).getRecord();
-
-      statusUpdate.getMapFields().putAll(_resultSummaryMap);
-      accessor.setProperty(keyBuilder.controllerTaskStatus(
-          MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()),
-          new StatusUpdate(statusUpdate));
-
-    }
-  }
-
-  private static Logger _logger = Logger
-      .getLogger(DefaultSchedulerMessageHandlerFactory.class);
-  HelixManager _manager;
-
-  public DefaultSchedulerMessageHandlerFactory(HelixManager manager)
-  {
-    _manager = manager;
-  }
-
-  @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
-    String type = message.getMsgType();
-
-    if (!type.equals(getMessageType()))
-    {
-      throw new HelixException("Unexpected msg type for message "
-          + message.getMsgId() + " type:" + message.getMsgType());
-    }
-
-    return new DefaultSchedulerMessageHandler(message, context, _manager);
-  }
-
-  @Override
-  public String getMessageType()
-  {
-    return MessageType.SCHEDULER_MSG.toString();
-  }
-
-  @Override
-  public void reset()
-  {
-  }
-
-  public static class DefaultSchedulerMessageHandler extends MessageHandler
-  {
-    HelixManager _manager;
-
-    public DefaultSchedulerMessageHandler(Message message,
-        NotificationContext context, HelixManager manager)
-    {
-      super(message, context);
-      _manager = manager;
-    }
-
-    @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
-      String type = _message.getMsgType();
-      HelixTaskResult result = new HelixTaskResult();
-      if (!type.equals(MessageType.SCHEDULER_MSG.toString()))
-      {
-        throw new HelixException("Unexpected msg type for message "
-            + _message.getMsgId() + " type:" + _message.getMsgType());
-      }
-      // Parse timeout value
-      int timeOut = -1;
-      if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT"))
-      {
-        try
-        {
-          timeOut = Integer.parseInt(_message.getRecord().getSimpleFields()
-              .get("TIMEOUT"));
-        } catch (Exception e)
-        {
-        }
-      }
-
-      // Parse the message template
-      ZNRecord record = new ZNRecord("templateMessage");
-      record.getSimpleFields().putAll(
-          _message.getRecord().getMapField("MessageTemplate"));
-      Message messageTemplate = new Message(record);
-
-      // Parse the criteria
-      StringReader sr = new StringReader(_message.getRecord().getSimpleField(
-          "Criteria"));
-      ObjectMapper mapper = new ObjectMapper();
-      Criteria recipientCriteria;
-      try
-      {
-        recipientCriteria = mapper.readValue(sr, Criteria.class);
-      } catch (Exception e)
-      {
-        _logger.error("", e);
-        result.setException(e);
-        result.setSuccess(false);
-        return result;
-      }
-      _logger.info("Scheduler sending message, criteria:" + recipientCriteria);
-      
-      boolean waitAll = false;
-      if(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) !=null)
-      {
-        try
-        {
-          waitAll = Boolean.parseBoolean(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
-        }
-        catch(Exception e)
-        {
-          _logger.warn("",e);
-        }
-      }
-      // Send all messages.
-      
-      int nMsgsSent = 0;
-      SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
-      if(waitAll)
-      {
-        nMsgsSent = _manager.getMessagingService().sendAndWait(recipientCriteria,
-            messageTemplate, 
-            callback,
-            timeOut);
-      }
-      else
-      {
-        nMsgsSent = _manager.getMessagingService().send(recipientCriteria,
-            messageTemplate, 
-            callback,
-            timeOut);
-      }
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      // Record the number of messages sent into status updates
-      Map<String, String> sendSummary = new HashMap<String, String>();
-      sendSummary.put("MessageCount", "" + nMsgsSent);
-      
-      ZNRecord statusUpdate = accessor.getProperty(
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
-              _message.getMsgId())).getRecord();
-
-      statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
-
-      accessor.setProperty(keyBuilder.controllerTaskStatus(
-          MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
-          new StatusUpdate(statusUpdate));
-
-      result.getTaskResultMap().put(
-          "ControllerResult",
-          "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
-              + " processed");
-      result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
-      result.setSuccess(true);
-      return result;
-    }
-
-    @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
-      _logger.error("Message handling pipeline get an exception. MsgId:"
-          + _message.getMsgId(), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java
deleted file mode 100644
index 7edc8ab..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-public class HelixGroupCommit<T>
-{
-  private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
-
-  private static class Queue<T>
-  {
-    final AtomicReference<Thread>      _running = new AtomicReference<Thread>();
-    final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
-  }
-
-  private static class Entry<T>
-  {
-    final String         _key;
-    final DataUpdater<T> _updater;
-    AtomicBoolean        _sent = new AtomicBoolean(false);
-
-    Entry(String key, DataUpdater<T> updater)
-    {
-      _key = key;
-      _updater = updater;
-    }
-  }
-
-  private final Queue<T>[] _queues = new Queue[100];
-
-  public HelixGroupCommit()
-  {
-    // Don't use Arrays.fill();
-    for (int i = 0; i < _queues.length; ++i)
-    {
-      _queues[i] = new Queue<T>();
-    }
-  }
-
-  private Queue<T> getQueue(String key)
-  {
-    return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
-  }
-
-  public boolean commit(ZkBaseDataAccessor<T> accessor,
-                        int options,
-                        String key,
-                        DataUpdater<T> updater)
-  {
-    Queue<T> queue = getQueue(key);
-    Entry<T> entry = new Entry<T>(key, updater);
-
-    queue._pending.add(entry);
-
-    while (!entry._sent.get())
-    {
-      if (queue._running.compareAndSet(null, Thread.currentThread()))
-      {
-        ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
-        try
-        {
-          Entry<T> first = queue._pending.peek();
-          if (first == null)
-          {
-            return true;
-          }
-
-          // remove from queue
-          // Entry first = queue._pending.poll();
-          // processed.add(first);
-
-          String mergedKey = first._key;
-
-          boolean retry;
-          do
-          {
-            retry = false;
-
-            try
-            {
-              T merged = null;
-
-              Stat readStat = new Stat();
-              try
-              {
-                // accessor will fallback to zk if not found in cache
-                merged = accessor.get(mergedKey, readStat, options);
-              }
-              catch (ZkNoNodeException e)
-              {
-                // OK.
-              }
-
-              // updater should handler merged == null
-              // merged = first._updater.update(merged);
-
-              // iterate over processed if we are retrying
-              Iterator<Entry<T>> it = processed.iterator();
-              while (it.hasNext())
-              {
-                Entry<T> ent = it.next();
-                if (!ent._key.equals(mergedKey))
-                {
-                  continue;
-                }
-                merged = ent._updater.update(merged);
-                // System.out.println("After merging:" + merged);
-              }
-
-              // iterate over queue._pending for newly coming requests
-              it = queue._pending.iterator();
-              while (it.hasNext())
-              {
-                Entry<T> ent = it.next();
-                if (!ent._key.equals(mergedKey))
-                {
-                  continue;
-                }
-                processed.add(ent);
-                merged = ent._updater.update(merged);
-                // System.out.println("After merging:" + merged);
-                it.remove();
-              }
-              // System.out.println("size:"+ processed.size());
-              accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
-            }
-            catch (ZkBadVersionException e)
-            {
-              retry = true;
-            }
-          }
-          while (retry);
-        }
-        finally
-        {
-          queue._running.set(null);
-          for (Entry<T> e : processed)
-          {
-            synchronized (e)
-            {
-              e._sent.set(true);
-              e.notify();
-            }
-          }
-        }
-      }
-      else
-      {
-        synchronized (entry)
-        {
-          try
-          {
-            entry.wait(10);
-          }
-          catch (InterruptedException e)
-          {
-            e.printStackTrace();
-            return false;
-          }
-        }
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java
deleted file mode 100644
index f52047d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-
-public interface PathBasedZkSerializer
-{
-
-  /**
-   * Serialize data differently according to different paths
-   * 
-   * @param data
-   * @param path
-   * @return
-   * @throws ZkMarshallingError
-   */
-  public byte[] serialize(Object data, String path) throws ZkMarshallingError;
-
-  /**
-   * Deserialize data differently according to different paths
-   * 
-   * @param bytes
-   * @param path
-   * @return
-   * @throws ZkMarshallingError
-   */
-  public Object deserialize(byte[] bytes, String path) throws ZkMarshallingError;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java
deleted file mode 100644
index 410c23c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.List;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class WriteThroughCache<T> extends Cache<T>
-{
-  private static Logger     LOG = Logger.getLogger(WriteThroughCache.class);
-
-  final BaseDataAccessor<T> _accessor;
-
-  public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths)
-  {
-    super();
-    _accessor = accessor;
-
-    // init cache
-    if (paths != null && !paths.isEmpty())
-    {
-      for (String path : paths)
-      {
-        updateRecursive(path);
-      }
-    }
-  }
-
-  @Override
-  public void update(String path, T data, Stat stat)
-  {
-    String parentPath = new File(path).getParent();
-    String childName = new File(path).getName();
-    addToParentChildSet(parentPath, childName);
-    
-    ZNode znode = _cache.get(path);
-    if (znode == null)
-    {
-      _cache.put(path, new ZNode(path, data, stat));
-    }
-    else
-    {
-      znode.setData(data);
-      znode.setStat(stat);
-    }
-  }
-  
-  @Override
-  public void updateRecursive(String path)
-  {
-    if (path == null)
-    {
-      return;
-    }
-
-    try
-    {
-      _lock.writeLock().lock();
-
-//      // update parent's childSet
-//      String parentPath = new File(path).getParent();
-//      String name = new File(path).getName();
-//      addToParentChildSet(parentPath, name);
-
-      // update this node
-      Stat stat = new Stat();
-      T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-
-      update(path, readData, stat);
-      
-//      ZNode znode = _cache.get(path);
-//      if (znode == null)
-//      {
-//        znode = new ZNode(path, readData, stat);
-//        _cache.put(path, znode);
-//      }
-//      else
-//      {
-//        znode.setData(readData);
-//        znode.setStat(stat);
-//      }
-
-      // recursively update children nodes if not exists
-      ZNode znode = _cache.get(path);
-      List<String> childNames = _accessor.getChildNames(path, 0);
-      if (childNames != null && childNames.size() > 0)
-      {
-        for (String childName : childNames)
-        {
-          String childPath = path + "/" + childName;
-          if (!znode.hasChild(childName))
-          {
-            znode.addChild(childName);
-            updateRecursive(childPath);
-          }
-        }
-      }
-    }
-    catch (ZkNoNodeException e)
-    {
-      // OK. someone delete znode while we are updating cache
-    }
-    finally
-    {
-      _lock.writeLock().unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java
deleted file mode 100644
index 14a7f6d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-
-@Deprecated
-public class ZKDataAccessor implements DataAccessor
-{
-  private static Logger logger = Logger.getLogger(ZKDataAccessor.class);
-
-  protected final String _clusterName;
-  protected final ZkClient _zkClient;
-
-  /**
-   * If a PropertyType has children (e.g. CONFIGS), then the parent path is the
-   * first key and child path is the second key; If a PropertyType has no child
-   * (e.g. LEADER), then no cache
-   */
-  private final Map<String, Map<String, ZNRecord>> _cache = new ConcurrentHashMap<String, Map<String, ZNRecord>>();
-
-  public ZKDataAccessor(String clusterName, ZkClient zkClient)
-  {
-    _clusterName = clusterName;
-    _zkClient = zkClient;
-  }
-
-  @Override
-  public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
-  {
-    if (!value.isValid())
-    {
-      throw new HelixException("The ZNRecord for " + type + " is not valid.");
-    }
-    return setProperty(type, value.getRecord(), keys);
-  }
-
-  @Override
-  public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
-  {
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
-    String parent = new File(path).getParent();
-    if (!_zkClient.exists(parent))
-    {
-      _zkClient.createPersistent(parent, true);
-    }
-
-    if (_zkClient.exists(path))
-    {
-      if (type.isCreateOnlyIfAbsent())
-      {
-        return false;
-      } else
-      {
-        ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), false);
-      }
-    } else
-    {
-      try
-      {
-        if (type.isPersistent())
-        {
-          _zkClient.createPersistent(path, value);
-        } else
-        {
-          _zkClient.createEphemeral(path, value);
-        }
-      } catch (Exception e)
-      {
-        logger.warn("Exception while creating path:" + path
-            + " Most likely due to race condition(Ignorable).", e);
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
-  {
-    return updateProperty(type, value.getRecord(), keys);
-  }
-
-  @Override
-  public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
-  {
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-    if (type.isUpdateOnlyOnExists())
-    {
-      ZKUtil.updateIfExists(_zkClient, path, value, type.isMergeOnUpdate());
-    } else
-    {
-      String parent = new File(path).getParent();
-
-      if (!_zkClient.exists(parent))
-      {
-        _zkClient.createPersistent(parent, true);
-      }
-
-      if (!type.usePropertyTransferServer())
-      {
-        ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
-      } else
-      {
-        ZKUtil.asyncCreateOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public <T extends HelixProperty>
-    T getProperty(Class<T> clazz, PropertyType type, String... keys)
-  {
-    return HelixProperty.convertToTypedInstance(clazz, getProperty(type, keys));
-  }
-
-  @Override
-  public ZNRecord getProperty(PropertyType type, String... keys)
-  {
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
-    if (!type.isCached())
-    {
-      return _zkClient.readData(path, true);
-    } else
-    {
-      int len = keys.length;
-      if (len == 0)
-      {
-        return _zkClient.readData(path, true);
-      } else
-      {
-        String[] subkeys = Arrays.copyOfRange(keys, 0, len - 1);
-        Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, subkeys);
-        return newChilds.get(keys[len - 1]);
-      }
-    }
-
-  }
-
-  @Override
-  public boolean removeProperty(PropertyType type, String... keys)
-  {
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-    return _zkClient.delete(path);
-  }
-
-  @Override
-  public List<String> getChildNames(PropertyType type, String... keys)
-  {
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-    if (_zkClient.exists(path))
-    {
-      return _zkClient.getChildren(path);
-    } else
-    {
-      return Collections.emptyList();
-    }
-  }
-
-  @Override
-  public <T extends HelixProperty>
-    List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
-  {
-    List<ZNRecord> newChilds = getChildValues(type, keys);
-    if (newChilds.size() > 0)
-    {
-      return HelixProperty.convertToTypedList(clazz, newChilds);
-    }
-    return Collections.emptyList();
-  }
-
-  @Override
-  public List<ZNRecord> getChildValues(PropertyType type, String... keys)
-
-  {
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-    // if (path == null)
-    // {
-    // System.err.println("path is null");
-    // }
-
-    if (_zkClient.exists(path))
-    {
-      if (!type.isCached())
-      {
-        return ZKUtil.getChildren(_zkClient, path);
-      } else
-      {
-        Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, keys);
-        return new ArrayList<ZNRecord>(newChilds.values());
-      }
-    }
-
-    return Collections.emptyList();
-  }
-
-  public void reset()
-  {
-    _cache.clear();
-  }
-
-  private Map<String, ZNRecord> refreshChildValuesCache(PropertyType type, String... keys)
-  {
-    if (!type.isCached())
-    {
-      throw new IllegalArgumentException("Type:" + type + " is NOT cached");
-    }
-
-    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
-    Map<String, ZNRecord> newChilds = refreshChildValues(path, _cache.get(path));
-    if (newChilds != null && newChilds.size() > 0)
-    {
-      _cache.put(path, newChilds);
-      return newChilds;
-    } else
-    {
-      _cache.remove(path);
-      return Collections.emptyMap();
-    }
-  }
-
-  /**
-   * Read a zookeeper node only if it's data has been changed since last read
-   *
-   * @param parentPath
-   * @param oldChildRecords
-   * @return newChildRecords
-   */
-  private Map<String, ZNRecord> refreshChildValues(String parentPath,
-      Map<String, ZNRecord> oldChildRecords)
-  {
-    List<String> childs = _zkClient.getChildren(parentPath);
-    if (childs == null || childs.size() == 0)
-    {
-      return Collections.emptyMap();
-    }
-
-    Stat newStat = new Stat();
-    Map<String, ZNRecord> newChildRecords = new HashMap<String, ZNRecord>();
-    for (String child : childs)
-    {
-      String childPath = parentPath + "/" + child;
-
-      // assume record.id should be the last part of zookeeper path
-      if (oldChildRecords == null || !oldChildRecords.containsKey(child))
-      {
-        ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
-        if (record != null)
-        {
-          record.setVersion(newStat.getVersion());
-          newChildRecords.put(child, record);
-        }
-      } else
-      {
-        ZNRecord oldChild = oldChildRecords.get(child);
-
-        int oldVersion = oldChild.getVersion();
-        long oldCtime = oldChild.getCreationTime();
-        newStat = _zkClient.getStat(childPath);
-        if (newStat != null)
-        {
-          // System.out.print(child + " oldStat:" + oldStat);
-          // System.out.print(child + " newStat:" + newStat);
-
-          if (oldCtime < newStat.getCtime() ||
-              oldVersion < newStat.getVersion())
-          {
-            ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
-            if (record != null)
-            {
-              record.setVersion(newStat.getVersion());
-              record.setCreationTime(newStat.getCtime());
-              record.setModifiedTime(newStat.getMtime());
-              newChildRecords.put(child, record);
-            }
-          } else
-          {
-            newChildRecords.put(child, oldChild);
-          }
-        }
-      }
-    }
-
-    return Collections.unmodifiableMap(newChildRecords);
-  }
-
-  @Override
-  public <T extends HelixProperty>
-    Map<String, T> getChildValuesMap(Class<T> clazz, PropertyType type, String... keys)
-  {
-    List<T> list = getChildValues(clazz, type, keys);
-    return Collections.unmodifiableMap(HelixProperty.convertListToMap(list));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java
deleted file mode 100644
index 0a5b52b..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.log4j.Logger;
-
-public class ZKExceptionHandler
-{
-  private static ZKExceptionHandler instance = new ZKExceptionHandler();
-  private static Logger logger = Logger.getLogger(ZKExceptionHandler.class);
-  private ZKExceptionHandler()
-  {
-
-  }
-
-  void handle(Exception e)
-  {
-    logger.error(Thread.currentThread().getName() + ". isThreadInterruped: " + Thread.currentThread().isInterrupted());
-
-    if (e instanceof ZkInterruptedException)
-    {
-      logger.error("zk connection is interrupted.", e);
-    }
-    else
-    {
-      logger.error(e.getMessage(), e);
-      // e.printStackTrace();
-    }
-  }
-
-  public static ZKExceptionHandler getInstance()
-  {
-    return instance;
-  }
-}