You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[12/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java
new file mode 100644
index 0000000..9867d13
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java
@@ -0,0 +1,569 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ClusterView;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.ClusterViewSerializer;
+import org.apache.helix.tools.IdealStateCalculatorByShuffling;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class StaticFileHelixManager implements HelixManager
+{
+  private static final Logger LOG = Logger.getLogger(StaticFileHelixManager.class.getName());
+  // for backward compatibility
+  // TODO remove it later
+  private final ClusterView _clusterView;
+  private final String _clusterName;
+  private final InstanceType _instanceType;
+  private final String _instanceName;
+  private boolean _isConnected;
+  public static final String _sessionId = "12345";
+  public static final String configFile = "configFile";
+
+  public StaticFileHelixManager(String clusterName, String instanceName, InstanceType instanceType,
+      String clusterViewFile)
+  {
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _instanceType = instanceType;
+    _clusterView = ClusterViewSerializer.deserialize(new File(clusterViewFile));
+  }
+
+  // FIXIT
+  // reorder the messages to reduce the possibility that a S->M message for a
+  // given
+  // db partition gets executed before a O->S message
+  private static void addMessageInOrder(List<ZNRecord> msgList, Message newMsg)
+  {
+    String toState = newMsg.getToState();
+    if (toState.equals("MASTER"))
+    {
+      msgList.add(newMsg.getRecord());
+    }
+    if (toState.equals("SLAVE"))
+    {
+      msgList.add(0, newMsg.getRecord());
+    }
+  }
+
+  private static List<Message> computeMessagesForSimpleTransition(ZNRecord idealStateRecord)
+  {
+    List<Message> msgList = new ArrayList<Message>();
+
+    IdealState idealState = new IdealState(idealStateRecord);
+    for (String stateUnitKey : idealState.getPartitionSet())
+    {
+      Map<String, String> instanceStateMap;
+      instanceStateMap = idealState.getInstanceStateMap(stateUnitKey);
+    }
+
+    return msgList;
+  }
+
+  public static class DBParam
+  {
+    public String name;
+    public int partitions;
+
+    public DBParam(String n, int p)
+    {
+      name = n;
+      partitions = p;
+    }
+  }
+
+  public static ClusterView generateStaticConfigClusterView(String[] nodesInfo,
+      List<DBParam> dbParams, int replica)
+  {
+    // create mock cluster view
+    ClusterView view = new ClusterView();
+
+    // add nodes
+    List<ZNRecord> nodeConfigList = new ArrayList<ZNRecord>();
+    List<String> instanceNames = new ArrayList<String>();
+
+    Arrays.sort(nodesInfo, new Comparator<String>() {
+
+      @Override
+      public int compare(String str1, String str2)
+      {
+        return str1.compareTo(str2);
+      }
+
+    });
+
+    // set CONFIGS
+    for (String nodeInfo : nodesInfo)
+    {
+      int lastPos = nodeInfo.lastIndexOf(":");
+      if (lastPos == -1)
+      {
+        throw new IllegalArgumentException("nodeInfo should be in format of host:port, " + nodeInfo);
+      }
+
+      String host = nodeInfo.substring(0, lastPos);
+      String port = nodeInfo.substring(lastPos + 1);
+      String nodeId = host + "_" + port;
+      ZNRecord nodeConfig = new ZNRecord(nodeId);
+
+      nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(),
+          Boolean.toString(true));
+      nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_HOST.toString(), host);
+      nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
+
+      instanceNames.add(nodeId);
+
+      nodeConfigList.add(nodeConfig);
+    }
+    view.setClusterPropertyList(PropertyType.CONFIGS, nodeConfigList);
+
+    // set IDEALSTATES
+    // compute ideal states for each db
+    List<ZNRecord> idealStates = new ArrayList<ZNRecord>();
+    for (DBParam dbParam : dbParams)
+    {
+      ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames,
+          dbParam.partitions, replica, dbParam.name);
+
+      idealStates.add(result);
+    }
+    view.setClusterPropertyList(PropertyType.IDEALSTATES, idealStates);
+
+    // calculate messages for transition using naive algorithm
+    Map<String, List<ZNRecord>> msgListForInstance = new HashMap<String, List<ZNRecord>>();
+    List<ZNRecord> idealStatesArray = view.getPropertyList(PropertyType.IDEALSTATES);
+    for (ZNRecord idealStateRecord : idealStatesArray)
+    {
+      // IdealState idealState = new IdealState(idealStateRecord);
+
+      List<Message> messages = computeMessagesForSimpleTransition(idealStateRecord);
+
+      for (Message message : messages)
+      {
+        // logger.info("Sending message to " + message.getTgtName() +
+        // " transition "
+        // + message.getStateUnitKey() + " from:" +
+        // message.getFromState() +
+        // " to:"
+        // + message.getToState());
+        // client.addMessage(message, message.getTgtName());
+        String instance = message.getTgtName();
+        List<ZNRecord> msgList = msgListForInstance.get(instance);
+        if (msgList == null)
+        {
+          msgList = new ArrayList<ZNRecord>();
+          msgListForInstance.put(instance, msgList);
+        }
+        // msgList.add(message);
+        addMessageInOrder(msgList, message);
+      }
+    }
+
+    // set INSTANCES
+    // put message lists into cluster view
+    List<ClusterView.MemberInstance> insList = new ArrayList<ClusterView.MemberInstance>();
+    for (Map.Entry<String, List<ZNRecord>> entry : msgListForInstance.entrySet())
+    {
+      String instance = entry.getKey();
+      List<ZNRecord> msgList = entry.getValue();
+
+      ClusterView.MemberInstance ins = view.getMemberInstance(instance, true);
+      ins.setInstanceProperty(PropertyType.MESSAGES, msgList);
+      // ins.setInstanceProperty(InstancePropertyType.CURRENTSTATES,
+      // null);
+      // ins.setInstanceProperty(InstancePropertyType.ERRORS, null);
+      // ins.setInstanceProperty(InstancePropertyType.STATUSUPDATES,
+      // null);
+      insList.add(ins);
+    }
+
+    // sort it
+    ClusterView.MemberInstance[] insArray = new ClusterView.MemberInstance[insList.size()];
+    insArray = insList.toArray(insArray);
+    Arrays.sort(insArray, new Comparator<ClusterView.MemberInstance>() {
+
+      @Override
+      public int compare(ClusterView.MemberInstance ins1, ClusterView.MemberInstance ins2)
+      {
+        return ins1.getInstanceName().compareTo(ins2.getInstanceName());
+      }
+
+    });
+
+    insList = Arrays.asList(insArray);
+    view.setInstances(insList);
+
+    return view;
+  }
+
+  @Override
+  public void disconnect()
+  {
+    _isConnected = false;
+  }
+
+  @Override
+  public void addIdealStateChangeListener(IdealStateChangeListener listener)
+  {
+
+    NotificationContext context = new NotificationContext(this);
+    context.setType(NotificationContext.Type.INIT);
+    List<ZNRecord> idealStates = _clusterView.getPropertyList(PropertyType.IDEALSTATES);
+    listener.onIdealStateChange(
+        HelixProperty.convertToTypedList(IdealState.class, idealStates), context);
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addLiveInstanceChangeListener is not supported by File Based cluster manager");
+  }
+
+  @Override
+  public void addConfigChangeListener(ConfigChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addConfigChangeListener() is NOT supported by File Based cluster manager");
+  }
+
+  @Override
+  public void addMessageListener(MessageListener listener, String instanceName)
+  {
+    NotificationContext context = new NotificationContext(this);
+    context.setType(NotificationContext.Type.INIT);
+    List<ZNRecord> messages;
+    messages = _clusterView.getMemberInstance(instanceName, true).getInstanceProperty(
+        PropertyType.MESSAGES);
+    listener.onMessage(instanceName, HelixProperty.convertToTypedList(Message.class, messages),
+        context);
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+      String instanceName, String sessionId)
+  {
+    throw new UnsupportedOperationException(
+        "addCurrentStateChangeListener is not supported by File Based cluster manager");
+  }
+
+  @Override
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addExternalViewChangeListener() is NOT supported by File Based cluster manager");
+  }
+
+  @Override
+  public DataAccessor getDataAccessor()
+  {
+    return null;
+  }
+
+  @Override
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+
+  @Override
+  public String getInstanceName()
+  {
+    return _instanceName;
+  }
+
+  @Override
+  public void connect()
+  {
+    _isConnected = true;
+  }
+
+  @Override
+  public String getSessionId()
+  {
+    return _sessionId;
+  }
+
+  public static ClusterView convertStateModelMapToClusterView(String outFile, String instanceName,
+      StateModelFactory<StateModel> stateModelFactory)
+  {
+    Map<String, StateModel> currentStateMap = stateModelFactory.getStateModelMap();
+    ClusterView curView = new ClusterView();
+
+    ClusterView.MemberInstance memberInstance = curView.getMemberInstance(instanceName, true);
+    List<ZNRecord> curStateList = new ArrayList<ZNRecord>();
+
+    for (Map.Entry<String, StateModel> entry : currentStateMap.entrySet())
+    {
+      String stateUnitKey = entry.getKey();
+      String curState = entry.getValue().getCurrentState();
+      ZNRecord record = new ZNRecord(stateUnitKey);
+      record.setSimpleField(stateUnitKey, curState);
+      curStateList.add(record);
+    }
+
+    memberInstance.setInstanceProperty(PropertyType.CURRENTSTATES, curStateList);
+
+    // serialize to file
+    // String outFile = "/tmp/curClusterView_" + instanceName +".json";
+    if (outFile != null)
+    {
+      // ClusterViewSerializer serializer = new
+      // ClusterViewSerializer(outFile);
+      // serializer.serialize(curView);
+      ClusterViewSerializer.serialize(curView, new File(outFile));
+    }
+
+    return curView;
+  }
+
+  public static boolean verifyFileBasedClusterStates(String instanceName, String expectedFile,
+      String curFile)
+  {
+    boolean ret = true;
+    ClusterView expectedView = ClusterViewSerializer.deserialize(new File(expectedFile));
+    ClusterView curView = ClusterViewSerializer.deserialize(new File(curFile));
+
+    // ideal_state for instance with the given instanceName
+    Map<String, String> idealStates = new HashMap<String, String>();
+    for (ZNRecord idealStateItem : expectedView.getPropertyList(PropertyType.IDEALSTATES))
+    {
+      Map<String, Map<String, String>> allIdealStates = idealStateItem.getMapFields();
+
+      for (Map.Entry<String, Map<String, String>> entry : allIdealStates.entrySet())
+      {
+        if (entry.getValue().containsKey(instanceName))
+        {
+          String state = entry.getValue().get(instanceName);
+          idealStates.put(entry.getKey(), state);
+        }
+      }
+    }
+
+    ClusterView.MemberInstance memberInstance = curView.getMemberInstance(instanceName, false);
+    List<ZNRecord> curStateList = memberInstance.getInstanceProperty(PropertyType.CURRENTSTATES);
+
+    if (curStateList == null && idealStates.size() > 0)
+    {
+      LOG.info("current state is null");
+      return false;
+    } else if (curStateList == null && idealStates.size() == 0)
+    {
+      LOG.info("empty current state and ideal state");
+      return true;
+    } else if (curStateList.size() != idealStates.size())
+    {
+      LOG.info("Number of current states (" + curStateList.size() + ") mismatch "
+          + "number of ideal states (" + idealStates.size() + ")");
+      return false;
+    }
+
+    for (ZNRecord record : curStateList)
+    {
+      String stateUnitKey = record.getId();
+      String curState = record.getSimpleField(stateUnitKey);
+
+      // if (!curState.equalsIgnoreCase("offline"))
+      // nonOfflineNr++;
+
+      if (!idealStates.containsKey(stateUnitKey))
+      {
+        LOG.error("Current state does not contain " + stateUnitKey);
+        ret = false;
+        continue;
+      }
+
+      String idealState = idealStates.get(stateUnitKey);
+      if (!curState.equalsIgnoreCase(idealState))
+      {
+        LOG.error("State mismatch--unit_key:" + stateUnitKey + " cur:" + curState + " ideal:"
+            + idealState + " instance_name:" + instanceName);
+        ret = false;
+        continue;
+      }
+
+    }
+
+    return ret;
+  }
+
+  @Override
+  public boolean isConnected()
+  {
+    return _isConnected;
+  }
+
+  @Override
+  public long getLastNotificationTime()
+  {
+    return 0;
+  }
+
+  @Override
+  public void addControllerListener(ControllerChangeListener listener)
+  {
+    throw new UnsupportedOperationException(
+        "addControllerListener() is NOT supported by File Based cluster manager");
+  }
+
+  @Override
+  public boolean removeListener(Object listener)
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public HelixAdmin getClusterManagmentTool()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public PropertyStore<ZNRecord> getPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public InstanceType getInstanceType()
+  {
+    return _instanceType;
+  }
+
+  @Override
+  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+      throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public String getVersion()
+  {
+    throw new UnsupportedOperationException("getVersion() not implemented in StaticFileClusterManager");
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine()
+  {
+    throw new UnsupportedOperationException("getStateMachineEngine() not implemented in StaticFileClusterManager");
+  }
+
+  @Override
+  public boolean isLeader()
+  {
+    throw new UnsupportedOperationException("isLeader() not implemented in StaticFileClusterManager");
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void startTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void stopTimerTasks()
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public HelixDataAccessor getHelixDataAccessor()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback)
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+  {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java b/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java
new file mode 100644
index 0000000..d490d98
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * A file-based implementation of Helix cluster manager (Deprecated)
+ * 
+ */
+package org.apache.helix.manager.file;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java
new file mode 100644
index 0000000..006fd3f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+/**
+ * Basic path based serializer which ignores the path and delegates
+ * serialization into a regular {@link ZkSerializer}
+ */
+public class BasicZkSerializer implements PathBasedZkSerializer
+{
+  private final ZkSerializer _delegate;
+  
+  public BasicZkSerializer(ZkSerializer delegate) 
+  {
+    _delegate = delegate;
+  }
+
+  public byte[] serialize(Object data, String path)
+  {
+    return _delegate.serialize(data);
+  }
+
+  @Override
+  public Object deserialize(byte[] bytes, String path)
+  {
+    return _delegate.deserialize(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java
new file mode 100644
index 0000000..a22d6be
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java
@@ -0,0 +1,20 @@
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+public class ByteArraySerializer implements ZkSerializer
+{
+  @Override
+  public byte[] serialize(Object data) throws ZkMarshallingError
+  {
+    return (byte[])data;
+  }
+
+  @Override
+  public Object deserialize(byte[] bytes) throws ZkMarshallingError
+  {
+    return bytes;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
new file mode 100644
index 0000000..0906685
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
@@ -0,0 +1,146 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.helix.store.zk.ZNode;
+import org.apache.zookeeper.data.Stat;
+
+
+public abstract class Cache<T>
+{
+  final ReadWriteLock                    _lock;
+  final ConcurrentHashMap<String, ZNode> _cache;
+
+  public Cache()
+  {
+    _lock = new ReentrantReadWriteLock();
+    _cache = new ConcurrentHashMap<String, ZNode>();
+  }
+
+  public void addToParentChildSet(String parentPath, String childName)
+  {
+    ZNode znode = _cache.get(parentPath);
+    if (znode != null)
+    {
+      znode.addChild(childName);
+    }
+  }
+
+  public void addToParentChildSet(String parentPath, List<String> childNames)
+  {
+    if (childNames != null && !childNames.isEmpty())
+    {
+      ZNode znode = _cache.get(parentPath);
+      if (znode != null)
+      {
+        znode.addChildren(childNames);
+      }
+    }
+  }
+
+  public void removeFromParentChildSet(String parentPath, String name)
+  {
+    ZNode zNode = _cache.get(parentPath);
+    if (zNode != null)
+    {
+      zNode.removeChild(name);
+    }
+  }
+
+  public boolean exists(String path)
+  {
+    return _cache.containsKey(path);
+  }
+
+  public ZNode get(String path)
+  {
+    try
+    {
+      _lock.readLock().lock();
+      return _cache.get(path);
+    } 
+    finally
+    {
+      _lock.readLock().unlock();
+    }
+  }
+
+  public void lockWrite()
+  {
+    _lock.writeLock().lock();
+  }
+
+  public void unlockWrite()
+  {
+    _lock.writeLock().unlock();
+  }
+
+  public void lockRead()
+  {
+    _lock.readLock().lock();
+  }
+
+  public void unlockRead()
+  {
+    _lock.readLock().unlock();
+  }
+
+  public void purgeRecursive(String path)
+  {
+    try
+    {
+      _lock.writeLock().lock();
+
+      String parentPath = new File(path).getParent();
+      String name = new File(path).getName();
+      removeFromParentChildSet(parentPath, name);
+
+      ZNode znode = _cache.remove(path);
+      if (znode != null)
+      {
+        // recursively remove children nodes
+        Set<String> childNames = znode.getChildSet();
+        for (String childName : childNames)
+        {
+          String childPath = path + "/" + childName;
+          purgeRecursive(childPath);
+        }
+      }
+    }
+    finally
+    {
+      _lock.writeLock().unlock();
+    }
+  }
+  
+  public void reset()
+  {
+    try
+    {
+      _lock.writeLock().lock();
+      _cache.clear();
+    } 
+    finally
+    {
+      _lock.writeLock().unlock();
+    }
+  }
+  
+  public abstract void update(String path, T data, Stat stat);
+  
+  public abstract void updateRecursive(String path);
+  
+  
+  // debug
+  public Map<String, ZNode> getCache()
+  {
+    return _cache;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
new file mode 100644
index 0000000..a67a79e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -0,0 +1,399 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+
+public class CallbackHandler implements IZkChildListener, IZkDataListener
+
+{
+
+  private static Logger logger = Logger.getLogger(CallbackHandler.class);
+
+  private final String _path;
+  private final Object _listener;
+  private final EventType[] _eventTypes;
+  private final HelixDataAccessor _accessor;
+  private final ChangeType _changeType;
+  private final ZkClient _zkClient;
+  private final AtomicLong lastNotificationTimeStamp;
+  private final HelixManager _manager;
+
+  public CallbackHandler(HelixManager manager, ZkClient client, String path,
+                         Object listener, EventType[] eventTypes, ChangeType changeType)
+  {
+    this._manager = manager;
+    this._accessor = manager.getHelixDataAccessor();
+    this._zkClient = client;
+    this._path = path;
+    this._listener = listener;
+    this._eventTypes = eventTypes;
+    this._changeType = changeType;
+    lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+    init();
+  }
+
+  public Object getListener()
+  {
+    return _listener;
+  }
+
+  public String getPath()
+  {
+    return _path;
+  }
+
+  public void invoke(NotificationContext changeContext) throws Exception
+  {
+    // This allows the listener to work with one change at a time
+    synchronized (_manager)
+    {
+      Builder keyBuilder = _accessor.keyBuilder();
+      long start = System.currentTimeMillis();
+      if (logger.isInfoEnabled())
+      {
+        logger.info(Thread.currentThread().getId() + " START:INVOKE "
+        // + changeContext.getPathChanged()
+            + _path + " listener:" + _listener.getClass().getCanonicalName());
+      }
+
+      if (_changeType == IDEAL_STATE)
+      {
+
+        IdealStateChangeListener idealStateChangeListener =
+            (IdealStateChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
+
+        idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
+
+      }
+      else if (_changeType == CONFIG)
+      {
+
+        ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<InstanceConfig> configs =
+            _accessor.getChildValues(keyBuilder.instanceConfigs());
+
+        configChangeListener.onConfigChange(configs, changeContext);
+
+      }
+      else if (_changeType == LIVE_INSTANCE)
+      {
+        LiveInstanceChangeListener liveInstanceChangeListener =
+            (LiveInstanceChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<LiveInstance> liveInstances =
+            _accessor.getChildValues(keyBuilder.liveInstances());
+
+        liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
+
+      }
+      else if (_changeType == CURRENT_STATE)
+      {
+        CurrentStateChangeListener currentStateChangeListener;
+        currentStateChangeListener = (CurrentStateChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+        String[] pathParts = _path.split("/");
+
+        // TODO: fix this
+        List<CurrentState> currentStates =
+            _accessor.getChildValues(keyBuilder.currentStates(instanceName,
+                                                              pathParts[pathParts.length - 1]));
+
+        currentStateChangeListener.onStateChange(instanceName,
+                                                 currentStates,
+                                                 changeContext);
+
+      }
+      else if (_changeType == MESSAGE)
+      {
+        MessageListener messageListener = (MessageListener) _listener;
+        subscribeForChanges(changeContext, _path, true, false);
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+        List<Message> messages =
+            _accessor.getChildValues(keyBuilder.messages(instanceName));
+
+        messageListener.onMessage(instanceName, messages, changeContext);
+
+      }
+      else if (_changeType == MESSAGES_CONTROLLER)
+      {
+        MessageListener messageListener = (MessageListener) _listener;
+        subscribeForChanges(changeContext, _path, true, false);
+        List<Message> messages =
+            _accessor.getChildValues(keyBuilder.controllerMessages());
+
+        messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
+
+      }
+      else if (_changeType == EXTERNAL_VIEW)
+      {
+        ExternalViewChangeListener externalViewListener =
+            (ExternalViewChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<ExternalView> externalViewList =
+            _accessor.getChildValues(keyBuilder.externalViews());
+
+        externalViewListener.onExternalViewChange(externalViewList, changeContext);
+      }
+      else if (_changeType == ChangeType.CONTROLLER)
+      {
+        ControllerChangeListener controllerChangelistener =
+            (ControllerChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, false);
+        controllerChangelistener.onControllerChange(changeContext);
+      }
+      else if (_changeType == ChangeType.HEALTH)
+      {
+        HealthStateChangeListener healthStateChangeListener =
+            (HealthStateChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
+        // settings here
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+
+        List<HealthStat> healthReportList =
+            _accessor.getChildValues(keyBuilder.healthReports(instanceName));
+
+        healthStateChangeListener.onHealthChange(instanceName,
+                                                 healthReportList,
+                                                 changeContext);
+      }
+      long end = System.currentTimeMillis();
+      if (logger.isInfoEnabled())
+      {
+        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
+            + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
+            + (end - start));
+      }
+    }
+  }
+
+  private void subscribeForChanges(NotificationContext context,
+                                   String path,
+                                   boolean watchParent,
+                                   boolean watchChild)
+  {
+    NotificationContext.Type type = context.getType();
+    if (watchParent && type == NotificationContext.Type.INIT)
+    {
+      logger.info(_manager.getInstanceName() + " subscribe child change@" + path);
+      _zkClient.subscribeChildChanges(path, this);
+    }
+    else if (watchParent && type == NotificationContext.Type.FINALIZE)
+    {
+      logger.info(_manager.getInstanceName() + " UNsubscribe child change@" + path);
+      _zkClient.unsubscribeChildChanges(path, this);
+    }
+
+    if (watchChild)
+    {
+      try
+      {
+        List<String> childNames = _zkClient.getChildren(path);
+        if (childNames == null || childNames.size() == 0)
+        {
+          return;
+        }
+
+        for (String childName : childNames)
+        {
+          String childPath = path + "/" + childName;
+          if (type == NotificationContext.Type.INIT
+              || type == NotificationContext.Type.CALLBACK)
+          {
+            if (logger.isDebugEnabled())
+            {
+              logger.debug(_manager.getInstanceName() + " subscribe data change@" + path);
+            }
+            _zkClient.subscribeDataChanges(childPath, this);
+
+          }
+          else if (type == NotificationContext.Type.FINALIZE)
+          {
+            logger.info(_manager.getInstanceName() + " UNsubscribe data change@" + path);
+            _zkClient.unsubscribeDataChanges(childPath, this);
+          }
+
+          subscribeForChanges(context, childPath, watchParent, watchChild);
+        }
+      }
+      catch (ZkNoNodeException e)
+      {
+        logger.warn("fail to subscribe data change@" + path);
+      }
+    }
+
+  }
+
+  public EventType[] getEventTypes()
+  {
+    return _eventTypes;
+  }
+
+  /**
+   * Invoke the listener so that it sets up the initial values from the zookeeper if any
+   * exists
+   * 
+   */
+  public void init()
+  {
+    updateNotificationTime(System.nanoTime());
+    try
+    {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.INIT);
+      invoke(changeContext);
+    }
+    catch (Exception e)
+    {
+      ZKExceptionHandler.getInstance().handle(e);
+    }
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data)
+  {
+    try
+    {
+      updateNotificationTime(System.nanoTime());
+      if (dataPath != null && dataPath.startsWith(_path))
+      {
+        NotificationContext changeContext = new NotificationContext(_manager);
+        changeContext.setType(NotificationContext.Type.CALLBACK);
+        invoke(changeContext);
+      }
+    }
+    catch (Exception e)
+    {
+      ZKExceptionHandler.getInstance().handle(e);
+    }
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath)
+  {
+    try
+    {
+      updateNotificationTime(System.nanoTime());
+      if (dataPath != null && dataPath.startsWith(_path))
+      {
+        NotificationContext changeContext = new NotificationContext(_manager);
+        changeContext.setType(NotificationContext.Type.CALLBACK);
+        _zkClient.unsubscribeChildChanges(dataPath, this);
+        invoke(changeContext);
+      }
+    }
+    catch (Exception e)
+    {
+      ZKExceptionHandler.getInstance().handle(e);
+    }
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds)
+  {
+    try
+    {
+      updateNotificationTime(System.nanoTime());
+      if (parentPath != null && parentPath.startsWith(_path))
+      {
+        NotificationContext changeContext = new NotificationContext(_manager);
+        changeContext.setType(NotificationContext.Type.CALLBACK);
+        invoke(changeContext);
+      }
+    }
+    catch (Exception e)
+    {
+      ZKExceptionHandler.getInstance().handle(e);
+    }
+  }
+
+  /**
+   * Invoke the listener for the last time so that the listener could clean up resources
+   * 
+   */
+  public void reset()
+  {
+    try
+    {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.FINALIZE);
+      invoke(changeContext);
+    }
+    catch (Exception e)
+    {
+      ZKExceptionHandler.getInstance().handle(e);
+    }
+  }
+
+  private void updateNotificationTime(long nanoTime)
+  {
+    long l = lastNotificationTimeStamp.get();
+    while (nanoTime > l)
+    {
+      boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+      if (b)
+      {
+        break;
+      }
+      else
+      {
+        l = lastNotificationTimeStamp.get();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
new file mode 100644
index 0000000..cfd0be0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
@@ -0,0 +1,131 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+public class ChainedPathZkSerializer implements PathBasedZkSerializer
+{
+
+  public static class Builder
+  {
+    private final ZkSerializer _defaultSerializer;
+    private List<ChainItem> _items = new ArrayList<ChainItem>();
+
+    private Builder(ZkSerializer defaultSerializer)
+    {
+      _defaultSerializer = defaultSerializer;
+    }
+
+    /**
+     * Add a serializing strategy for the given path prefix
+     * The most specific path will triumph over a more generic (shorter)
+     * one regardless of the ordering of the calls.
+     */
+    public Builder serialize(String path, ZkSerializer withSerializer)
+    {
+      _items.add(new ChainItem(normalize(path), withSerializer));
+      return this;
+    }
+    
+    /**
+     * Builds the serializer with the given strategies and default serializer.
+     */
+    public ChainedPathZkSerializer build() {
+      return new ChainedPathZkSerializer(_defaultSerializer, _items);
+    }
+  }
+  
+  /**
+   * Create a builder that will use the given serializer by default
+   * if no other strategy is given to solve the path in question.
+   */
+  public static Builder builder(ZkSerializer defaultSerializer) 
+  {
+    return new Builder(defaultSerializer);
+  }
+
+  private final List<ChainItem> _items;
+  private final ZkSerializer _defaultSerializer;
+
+  private ChainedPathZkSerializer(ZkSerializer defaultSerializer, List<ChainItem> items)
+  {
+    _items = items;
+    // sort by longest paths first
+    // if two items would match one would be prefix of the other
+    // and the longest must be more specific
+    Collections.sort(_items);
+    _defaultSerializer = defaultSerializer;
+  }
+
+  @Override
+  public byte[] serialize(Object data, String path) throws ZkMarshallingError
+  {
+    for (ChainItem item : _items)
+    {
+      if (item.matches(path)) return item._serializer.serialize(data);
+    }
+    return _defaultSerializer.serialize(data);
+  }
+
+  @Override
+  public Object deserialize(byte[] bytes, String path)
+      throws ZkMarshallingError
+  {
+    for (ChainItem item : _items)
+    {
+      if (item.matches(path)) return item._serializer.deserialize(bytes);
+    }
+    return _defaultSerializer.deserialize(bytes);
+  }
+
+  private static class ChainItem implements Comparable<ChainItem>
+  {
+    final String _path;
+    final ZkSerializer _serializer;
+
+    ChainItem(String path, ZkSerializer serializer)
+    {
+      _path = path;
+      _serializer = serializer;
+    }
+
+    boolean matches(String path)
+    {
+      if (_path.equals(path))
+      {
+        return true;
+      } 
+      else if (path.length() > _path.length())
+      {
+        if (path.startsWith(_path) && path.charAt(_path.length()) == '/') 
+        {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public int compareTo(ChainItem o)
+    {
+      return o._path.length() - _path.length();
+    }
+  }
+  
+  private static String normalize(String path) {
+    if (!path.startsWith("/")) {
+      // ensure leading slash
+      path = "/" + path;
+    }
+    if (path.endsWith("/")) {
+      // remove trailing slash
+      path = path.substring(0, path.length()-1);
+    }
+    return path;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
new file mode 100644
index 0000000..7e2a48f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+public class DefaultControllerMessageHandlerFactory implements
+    MessageHandlerFactory
+{
+  private static Logger _logger = Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
+  @Override
+  public MessageHandler createHandler(Message message,
+      NotificationContext context)
+  {
+    String type = message.getMsgType();
+    
+    if(!type.equals(getMessageType()))
+    {
+      throw new HelixException("Unexpected msg type for message "+message.getMsgId()
+          +" type:" + message.getMsgType());
+    }
+    
+    return new DefaultControllerMessageHandler(message, context);
+  }
+
+  @Override
+  public String getMessageType()
+  {
+    return MessageType.CONTROLLER_MSG.toString();
+  }
+
+  @Override
+  public void reset()
+  {
+
+  }
+  
+  public static class DefaultControllerMessageHandler extends MessageHandler
+  {
+    public DefaultControllerMessageHandler(Message message,
+        NotificationContext context)
+    {
+      super(message, context);
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException
+    {
+      String type = _message.getMsgType();
+      HelixTaskResult result = new HelixTaskResult();
+      if(!type.equals(MessageType.CONTROLLER_MSG.toString()))
+      {
+        throw new HelixException("Unexpected msg type for message "+_message.getMsgId()
+            +" type:" + _message.getMsgType());
+      }
+      result.getTaskResultMap().put("ControllerResult", "msg "+ _message.getMsgId() + " from "+_message.getMsgSrc() + " processed");
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type)
+    {
+      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
new file mode 100644
index 0000000..307f219
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
@@ -0,0 +1,128 @@
+package org.apache.helix.manager.zk;
+
+
+import java.util.Arrays;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+/**
+ * DefaultParticipantErrorMessageHandlerFactory works on controller side.
+ * When the participant detects a critical error, it will send the PARTICIPANT_ERROR_REPORT
+ * Message to the controller, specifying whether it want to disable the instance or
+ * disable the partition. The controller have a chance to do whatever make sense at that point,
+ * and then disable the corresponding partition or the instance. More configs per resource will
+ * be added to customize the controller behavior.
+ * */
+public class DefaultParticipantErrorMessageHandlerFactory implements
+  MessageHandlerFactory
+{
+  public enum ActionOnError
+  {
+    DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
+  }
+
+  public static final String ACTIONKEY = "ActionOnError";
+
+  private static Logger _logger = Logger
+    .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
+  final HelixManager _manager;
+
+  public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
+  {
+    _manager = manager;
+  }
+
+  public static class DefaultParticipantErrorMessageHandler extends MessageHandler
+  {
+    final HelixManager _manager;
+    public DefaultParticipantErrorMessageHandler(Message message,
+        NotificationContext context,  HelixManager manager)
+    {
+       super(message, context);
+       _manager = manager;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException
+    {
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      // TODO : consider unify this with StatsAggregationStage.executeAlertActions()
+      try
+      {
+        ActionOnError actionOnError
+          = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
+
+        if(actionOnError == ActionOnError.DISABLE_INSTANCE)
+        {
+          _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
+          _logger.info("Instance " + _message.getMsgSrc() + " disabled");
+        }
+        else if(actionOnError == ActionOnError.DISABLE_PARTITION)
+        {
+          _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
+              _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
+          _logger.info("partition " + _message.getPartitionName() + " disabled");
+        }
+        else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
+        {
+          // NOT IMPLEMENTED, or we can disable all partitions
+          //_manager.getClusterManagmentTool().en(_manager.getClusterName(), _manager.getInstanceName(),
+          //    _message.getResourceName(), _message.getPartitionName(), false);
+          _logger.info("resource " + _message.getResourceName() + " disabled");
+        }
+      }
+      catch(Exception e)
+      {
+        _logger.error("", e);
+        result.setSuccess(false);
+        result.setException(e);
+      }
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type)
+    {
+      _logger.error("Message handling pipeline get an exception. MsgId:"
+          + _message.getMsgId(), e);
+    }
+
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message,
+      NotificationContext context)
+  {
+    String type = message.getMsgType();
+
+    if(!type.equals(getMessageType()))
+    {
+      throw new HelixException("Unexpected msg type for message "+message.getMsgId()
+          +" type:" + message.getMsgType());
+    }
+
+    return new DefaultParticipantErrorMessageHandler(message, context, _manager);
+  }
+
+  @Override
+  public String getMessageType()
+  {
+    return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
+  }
+
+  @Override
+  public void reset()
+  {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
new file mode 100644
index 0000000..e964f44
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -0,0 +1,277 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/*
+ * TODO: The current implementation is temporary for backup handler testing only and it does not 
+ * do any throttling. 
+ * 
+ */
+public class DefaultSchedulerMessageHandlerFactory implements
+    MessageHandlerFactory
+{
+  public static final String WAIT_ALL = "WAIT_ALL";
+  public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
+  public static class SchedulerAsyncCallback extends AsyncCallback
+  {
+    StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
+    Message _originalMessage;
+    HelixManager _manager;
+    final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap<String, Map<String, String>>();
+
+    public SchedulerAsyncCallback(Message originalMessage, HelixManager manager)
+    {
+      _originalMessage = originalMessage;
+      _manager = manager;
+    }
+
+    @Override
+    public void onTimeOut()
+    {
+      _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId()
+          + " timout with " + _timeout + " Ms");
+
+      _statusUpdateUtil.logError(_originalMessage,
+          SchedulerAsyncCallback.class, "Task timeout",
+          _manager.getHelixDataAccessor());
+      addSummary(_resultSummaryMap, _originalMessage, _manager, true);
+    }
+
+    @Override
+    public void onReplyMessage(Message message)
+    {
+      _logger.info("Update for scheduler msg " + _originalMessage.getMsgId()
+          + " Message " + message.getMsgSrc() + " id "
+          + message.getCorrelationId() + " completed");
+      String key = "MessageResult " + message.getMsgSrc() + " "
+          + UUID.randomUUID();
+      _resultSummaryMap.put(key, message.getResultMap());
+
+      if (this.isDone())
+      {
+        _logger.info("Scheduler msg " + _originalMessage.getMsgId()
+            + " completed");
+        _statusUpdateUtil.logInfo(_originalMessage,
+            SchedulerAsyncCallback.class, "Scheduler task completed",
+            _manager.getHelixDataAccessor());
+        addSummary(_resultSummaryMap, _originalMessage, _manager, false);
+      }
+    }
+
+    private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
+        Message originalMessage, HelixManager manager, boolean timeOut)
+    {
+      Map<String, String> summary = new TreeMap<String, String>();
+      summary.put("TotalMessages:", "" + _resultSummaryMap.size());
+      summary.put("Timeout", "" + timeOut);
+      _resultSummaryMap.put("Summary", summary);
+
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      ZNRecord statusUpdate = accessor.getProperty(
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              originalMessage.getMsgId())).getRecord();
+
+      statusUpdate.getMapFields().putAll(_resultSummaryMap);
+      accessor.setProperty(keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()),
+          new StatusUpdate(statusUpdate));
+
+    }
+  }
+
+  private static Logger _logger = Logger
+      .getLogger(DefaultSchedulerMessageHandlerFactory.class);
+  HelixManager _manager;
+
+  public DefaultSchedulerMessageHandlerFactory(HelixManager manager)
+  {
+    _manager = manager;
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message,
+      NotificationContext context)
+  {
+    String type = message.getMsgType();
+
+    if (!type.equals(getMessageType()))
+    {
+      throw new HelixException("Unexpected msg type for message "
+          + message.getMsgId() + " type:" + message.getMsgType());
+    }
+
+    return new DefaultSchedulerMessageHandler(message, context, _manager);
+  }
+
+  @Override
+  public String getMessageType()
+  {
+    return MessageType.SCHEDULER_MSG.toString();
+  }
+
+  @Override
+  public void reset()
+  {
+  }
+
+  public static class DefaultSchedulerMessageHandler extends MessageHandler
+  {
+    HelixManager _manager;
+
+    public DefaultSchedulerMessageHandler(Message message,
+        NotificationContext context, HelixManager manager)
+    {
+      super(message, context);
+      _manager = manager;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException
+    {
+      String type = _message.getMsgType();
+      HelixTaskResult result = new HelixTaskResult();
+      if (!type.equals(MessageType.SCHEDULER_MSG.toString()))
+      {
+        throw new HelixException("Unexpected msg type for message "
+            + _message.getMsgId() + " type:" + _message.getMsgType());
+      }
+      // Parse timeout value
+      int timeOut = -1;
+      if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT"))
+      {
+        try
+        {
+          timeOut = Integer.parseInt(_message.getRecord().getSimpleFields()
+              .get("TIMEOUT"));
+        } catch (Exception e)
+        {
+        }
+      }
+
+      // Parse the message template
+      ZNRecord record = new ZNRecord("templateMessage");
+      record.getSimpleFields().putAll(
+          _message.getRecord().getMapField("MessageTemplate"));
+      Message messageTemplate = new Message(record);
+
+      // Parse the criteria
+      StringReader sr = new StringReader(_message.getRecord().getSimpleField(
+          "Criteria"));
+      ObjectMapper mapper = new ObjectMapper();
+      Criteria recipientCriteria;
+      try
+      {
+        recipientCriteria = mapper.readValue(sr, Criteria.class);
+      } catch (Exception e)
+      {
+        _logger.error("", e);
+        result.setException(e);
+        result.setSuccess(false);
+        return result;
+      }
+      _logger.info("Scheduler sending message, criteria:" + recipientCriteria);
+      
+      boolean waitAll = false;
+      if(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) !=null)
+      {
+        try
+        {
+          waitAll = Boolean.parseBoolean(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
+        }
+        catch(Exception e)
+        {
+          _logger.warn("",e);
+        }
+      }
+      // Send all messages.
+      
+      int nMsgsSent = 0;
+      SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
+      if(waitAll)
+      {
+        nMsgsSent = _manager.getMessagingService().sendAndWait(recipientCriteria,
+            messageTemplate, 
+            callback,
+            timeOut);
+      }
+      else
+      {
+        nMsgsSent = _manager.getMessagingService().send(recipientCriteria,
+            messageTemplate, 
+            callback,
+            timeOut);
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+
+      // Record the number of messages sent into status updates
+      Map<String, String> sendSummary = new HashMap<String, String>();
+      sendSummary.put("MessageCount", "" + nMsgsSent);
+      
+      ZNRecord statusUpdate = accessor.getProperty(
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              _message.getMsgId())).getRecord();
+
+      statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
+
+      accessor.setProperty(keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
+          new StatusUpdate(statusUpdate));
+
+      result.getTaskResultMap().put(
+          "ControllerResult",
+          "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
+              + " processed");
+      result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type)
+    {
+      _logger.error("Message handling pipeline get an exception. MsgId:"
+          + _message.getMsgId(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
new file mode 100644
index 0000000..d1e9402
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
@@ -0,0 +1,174 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class HelixGroupCommit<T>
+{
+  private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
+
+  private static class Queue<T>
+  {
+    final AtomicReference<Thread>      _running = new AtomicReference<Thread>();
+    final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
+  }
+
+  private static class Entry<T>
+  {
+    final String         _key;
+    final DataUpdater<T> _updater;
+    AtomicBoolean        _sent = new AtomicBoolean(false);
+
+    Entry(String key, DataUpdater<T> updater)
+    {
+      _key = key;
+      _updater = updater;
+    }
+  }
+
+  private final Queue<T>[] _queues = new Queue[100];
+
+  public HelixGroupCommit()
+  {
+    // Don't use Arrays.fill();
+    for (int i = 0; i < _queues.length; ++i)
+    {
+      _queues[i] = new Queue<T>();
+    }
+  }
+
+  private Queue<T> getQueue(String key)
+  {
+    return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
+  }
+
+  public boolean commit(ZkBaseDataAccessor<T> accessor,
+                        int options,
+                        String key,
+                        DataUpdater<T> updater)
+  {
+    Queue<T> queue = getQueue(key);
+    Entry<T> entry = new Entry<T>(key, updater);
+
+    queue._pending.add(entry);
+
+    while (!entry._sent.get())
+    {
+      if (queue._running.compareAndSet(null, Thread.currentThread()))
+      {
+        ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
+        try
+        {
+          Entry<T> first = queue._pending.peek();
+          if (first == null)
+          {
+            return true;
+          }
+
+          // remove from queue
+          // Entry first = queue._pending.poll();
+          // processed.add(first);
+
+          String mergedKey = first._key;
+
+          boolean retry;
+          do
+          {
+            retry = false;
+
+            try
+            {
+              T merged = null;
+
+              Stat readStat = new Stat();
+              try
+              {
+                // accessor will fallback to zk if not found in cache
+                merged = accessor.get(mergedKey, readStat, options);
+              }
+              catch (ZkNoNodeException e)
+              {
+                // OK.
+              }
+
+              // updater should handler merged == null
+              // merged = first._updater.update(merged);
+
+              // iterate over processed if we are retrying
+              Iterator<Entry<T>> it = processed.iterator();
+              while (it.hasNext())
+              {
+                Entry<T> ent = it.next();
+                if (!ent._key.equals(mergedKey))
+                {
+                  continue;
+                }
+                merged = ent._updater.update(merged);
+                // System.out.println("After merging:" + merged);
+              }
+
+              // iterate over queue._pending for newly coming requests
+              it = queue._pending.iterator();
+              while (it.hasNext())
+              {
+                Entry<T> ent = it.next();
+                if (!ent._key.equals(mergedKey))
+                {
+                  continue;
+                }
+                processed.add(ent);
+                merged = ent._updater.update(merged);
+                // System.out.println("After merging:" + merged);
+                it.remove();
+              }
+              // System.out.println("size:"+ processed.size());
+              accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
+            }
+            catch (ZkBadVersionException e)
+            {
+              retry = true;
+            }
+          }
+          while (retry);
+        }
+        finally
+        {
+          queue._running.set(null);
+          for (Entry<T> e : processed)
+          {
+            synchronized (e)
+            {
+              e._sent.set(true);
+              e.notify();
+            }
+          }
+        }
+      }
+      else
+      {
+        synchronized (entry)
+        {
+          try
+          {
+            entry.wait(10);
+          }
+          catch (InterruptedException e)
+          {
+            e.printStackTrace();
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java
new file mode 100644
index 0000000..5a0aec7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java
@@ -0,0 +1,28 @@
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+
+public interface PathBasedZkSerializer
+{
+
+  /**
+   * Serialize data differently according to different paths
+   * 
+   * @param data
+   * @param path
+   * @return
+   * @throws ZkMarshallingError
+   */
+  public byte[] serialize(Object data, String path) throws ZkMarshallingError;
+
+  /**
+   * Deserialize data differently according to different paths
+   * 
+   * @param bytes
+   * @param path
+   * @return
+   * @throws ZkMarshallingError
+   */
+  public Object deserialize(byte[] bytes, String path) throws ZkMarshallingError;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
new file mode 100644
index 0000000..9b7974e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
@@ -0,0 +1,114 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+public class WriteThroughCache<T> extends Cache<T>
+{
+  private static Logger     LOG = Logger.getLogger(WriteThroughCache.class);
+
+  final BaseDataAccessor<T> _accessor;
+
+  public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths)
+  {
+    super();
+    _accessor = accessor;
+
+    // init cache
+    if (paths != null && !paths.isEmpty())
+    {
+      for (String path : paths)
+      {
+        updateRecursive(path);
+      }
+    }
+  }
+
+  @Override
+  public void update(String path, T data, Stat stat)
+  {
+    String parentPath = new File(path).getParent();
+    String childName = new File(path).getName();
+    addToParentChildSet(parentPath, childName);
+    
+    ZNode znode = _cache.get(path);
+    if (znode == null)
+    {
+      _cache.put(path, new ZNode(path, data, stat));
+    }
+    else
+    {
+      znode.setData(data);
+      znode.setStat(stat);
+    }
+  }
+  
+  @Override
+  public void updateRecursive(String path)
+  {
+    if (path == null)
+    {
+      return;
+    }
+
+    try
+    {
+      _lock.writeLock().lock();
+
+//      // update parent's childSet
+//      String parentPath = new File(path).getParent();
+//      String name = new File(path).getName();
+//      addToParentChildSet(parentPath, name);
+
+      // update this node
+      Stat stat = new Stat();
+      T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+
+      update(path, readData, stat);
+      
+//      ZNode znode = _cache.get(path);
+//      if (znode == null)
+//      {
+//        znode = new ZNode(path, readData, stat);
+//        _cache.put(path, znode);
+//      }
+//      else
+//      {
+//        znode.setData(readData);
+//        znode.setStat(stat);
+//      }
+
+      // recursively update children nodes if not exists
+      ZNode znode = _cache.get(path);
+      List<String> childNames = _accessor.getChildNames(path, 0);
+      if (childNames != null && childNames.size() > 0)
+      {
+        for (String childName : childNames)
+        {
+          String childPath = path + "/" + childName;
+          if (!znode.hasChild(childName))
+          {
+            znode.addChild(childName);
+            updateRecursive(childPath);
+          }
+        }
+      }
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK. someone delete znode while we are updating cache
+    }
+    finally
+    {
+      _lock.writeLock().unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java
new file mode 100644
index 0000000..c4cdf78
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java
@@ -0,0 +1,329 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+@Deprecated
+public class ZKDataAccessor implements DataAccessor
+{
+  private static Logger logger = Logger.getLogger(ZKDataAccessor.class);
+
+  protected final String _clusterName;
+  protected final ZkClient _zkClient;
+
+  /**
+   * If a PropertyType has children (e.g. CONFIGS), then the parent path is the
+   * first key and child path is the second key; If a PropertyType has no child
+   * (e.g. LEADER), then no cache
+   */
+  private final Map<String, Map<String, ZNRecord>> _cache = new ConcurrentHashMap<String, Map<String, ZNRecord>>();
+
+  public ZKDataAccessor(String clusterName, ZkClient zkClient)
+  {
+    _clusterName = clusterName;
+    _zkClient = zkClient;
+  }
+
+  @Override
+  public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
+  {
+    if (!value.isValid())
+    {
+      throw new HelixException("The ZNRecord for " + type + " is not valid.");
+    }
+    return setProperty(type, value.getRecord(), keys);
+  }
+
+  @Override
+  public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    String parent = new File(path).getParent();
+    if (!_zkClient.exists(parent))
+    {
+      _zkClient.createPersistent(parent, true);
+    }
+
+    if (_zkClient.exists(path))
+    {
+      if (type.isCreateOnlyIfAbsent())
+      {
+        return false;
+      } else
+      {
+        ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), false);
+      }
+    } else
+    {
+      try
+      {
+        if (type.isPersistent())
+        {
+          _zkClient.createPersistent(path, value);
+        } else
+        {
+          _zkClient.createEphemeral(path, value);
+        }
+      } catch (Exception e)
+      {
+        logger.warn("Exception while creating path:" + path
+            + " Most likely due to race condition(Ignorable).", e);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
+  {
+    return updateProperty(type, value.getRecord(), keys);
+  }
+
+  @Override
+  public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+    if (type.isUpdateOnlyOnExists())
+    {
+      ZKUtil.updateIfExists(_zkClient, path, value, type.isMergeOnUpdate());
+    } else
+    {
+      String parent = new File(path).getParent();
+
+      if (!_zkClient.exists(parent))
+      {
+        _zkClient.createPersistent(parent, true);
+      }
+
+      if (!type.usePropertyTransferServer())
+      {
+        ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
+      } else
+      {
+        ZKUtil.asyncCreateOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public <T extends HelixProperty>
+    T getProperty(Class<T> clazz, PropertyType type, String... keys)
+  {
+    return HelixProperty.convertToTypedInstance(clazz, getProperty(type, keys));
+  }
+
+  @Override
+  public ZNRecord getProperty(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    if (!type.isCached())
+    {
+      return _zkClient.readData(path, true);
+    } else
+    {
+      int len = keys.length;
+      if (len == 0)
+      {
+        return _zkClient.readData(path, true);
+      } else
+      {
+        String[] subkeys = Arrays.copyOfRange(keys, 0, len - 1);
+        Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, subkeys);
+        return newChilds.get(keys[len - 1]);
+      }
+    }
+
+  }
+
+  @Override
+  public boolean removeProperty(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+    return _zkClient.delete(path);
+  }
+
+  @Override
+  public List<String> getChildNames(PropertyType type, String... keys)
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+    if (_zkClient.exists(path))
+    {
+      return _zkClient.getChildren(path);
+    } else
+    {
+      return Collections.emptyList();
+    }
+  }
+
+  @Override
+  public <T extends HelixProperty>
+    List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
+  {
+    List<ZNRecord> newChilds = getChildValues(type, keys);
+    if (newChilds.size() > 0)
+    {
+      return HelixProperty.convertToTypedList(clazz, newChilds);
+    }
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<ZNRecord> getChildValues(PropertyType type, String... keys)
+
+  {
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+    // if (path == null)
+    // {
+    // System.err.println("path is null");
+    // }
+
+    if (_zkClient.exists(path))
+    {
+      if (!type.isCached())
+      {
+        return ZKUtil.getChildren(_zkClient, path);
+      } else
+      {
+        Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, keys);
+        return new ArrayList<ZNRecord>(newChilds.values());
+      }
+    }
+
+    return Collections.emptyList();
+  }
+
+  public void reset()
+  {
+    _cache.clear();
+  }
+
+  private Map<String, ZNRecord> refreshChildValuesCache(PropertyType type, String... keys)
+  {
+    if (!type.isCached())
+    {
+      throw new IllegalArgumentException("Type:" + type + " is NOT cached");
+    }
+
+    String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+    Map<String, ZNRecord> newChilds = refreshChildValues(path, _cache.get(path));
+    if (newChilds != null && newChilds.size() > 0)
+    {
+      _cache.put(path, newChilds);
+      return newChilds;
+    } else
+    {
+      _cache.remove(path);
+      return Collections.emptyMap();
+    }
+  }
+
+  /**
+   * Read a zookeeper node only if it's data has been changed since last read
+   *
+   * @param parentPath
+   * @param oldChildRecords
+   * @return newChildRecords
+   */
+  private Map<String, ZNRecord> refreshChildValues(String parentPath,
+      Map<String, ZNRecord> oldChildRecords)
+  {
+    List<String> childs = _zkClient.getChildren(parentPath);
+    if (childs == null || childs.size() == 0)
+    {
+      return Collections.emptyMap();
+    }
+
+    Stat newStat = new Stat();
+    Map<String, ZNRecord> newChildRecords = new HashMap<String, ZNRecord>();
+    for (String child : childs)
+    {
+      String childPath = parentPath + "/" + child;
+
+      // assume record.id should be the last part of zookeeper path
+      if (oldChildRecords == null || !oldChildRecords.containsKey(child))
+      {
+        ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
+        if (record != null)
+        {
+          record.setVersion(newStat.getVersion());
+          newChildRecords.put(child, record);
+        }
+      } else
+      {
+        ZNRecord oldChild = oldChildRecords.get(child);
+
+        int oldVersion = oldChild.getVersion();
+        long oldCtime = oldChild.getCreationTime();
+        newStat = _zkClient.getStat(childPath);
+        if (newStat != null)
+        {
+          // System.out.print(child + " oldStat:" + oldStat);
+          // System.out.print(child + " newStat:" + newStat);
+
+          if (oldCtime < newStat.getCtime() ||
+              oldVersion < newStat.getVersion())
+          {
+            ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
+            if (record != null)
+            {
+              record.setVersion(newStat.getVersion());
+              record.setCreationTime(newStat.getCtime());
+              record.setModifiedTime(newStat.getMtime());
+              newChildRecords.put(child, record);
+            }
+          } else
+          {
+            newChildRecords.put(child, oldChild);
+          }
+        }
+      }
+    }
+
+    return Collections.unmodifiableMap(newChildRecords);
+  }
+
+  @Override
+  public <T extends HelixProperty>
+    Map<String, T> getChildValuesMap(Class<T> clazz, PropertyType type, String... keys)
+  {
+    List<T> list = getChildValues(clazz, type, keys);
+    return Collections.unmodifiableMap(HelixProperty.convertListToMap(list));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java
new file mode 100644
index 0000000..340c3e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.log4j.Logger;
+
+public class ZKExceptionHandler
+{
+  private static ZKExceptionHandler instance = new ZKExceptionHandler();
+  private static Logger logger = Logger.getLogger(ZKExceptionHandler.class);
+  private ZKExceptionHandler()
+  {
+
+  }
+
+  void handle(Exception e)
+  {
+    logger.error(Thread.currentThread().getName() + ". isThreadInterruped: " + Thread.currentThread().isInterrupted());
+
+    if (e instanceof ZkInterruptedException)
+    {
+      logger.error("zk connection is interrupted.", e);
+    }
+    else
+    {
+      logger.error(e.getMessage(), e);
+      // e.printStackTrace();
+    }
+  }
+
+  public static ZKExceptionHandler getInstance()
+  {
+    return instance;
+  }
+}