You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[13/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
new file mode 100644
index 0000000..36dbf9f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -0,0 +1,1272 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.model.Alerts;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.PersistentStats;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+
+public class ZKHelixAdmin implements HelixAdmin
+{
+
+  private final ZkClient _zkClient;
+  private final ConfigAccessor _configAccessor;
+
+  private static Logger logger = Logger.getLogger(ZKHelixAdmin.class);
+
+  public ZKHelixAdmin(ZkClient zkClient)
+  {
+    _zkClient = zkClient;
+    _configAccessor = new ConfigAccessor(zkClient);
+  }
+
+  @Override
+  public void addInstance(String clusterName, InstanceConfig instanceConfig)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+    String instanceConfigsPath =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+    String nodeId = instanceConfig.getId();
+    String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
+
+    if (_zkClient.exists(instanceConfigPath))
+    {
+      throw new HelixException("Node " + nodeId + " already exists in cluster "
+          + clusterName);
+    }
+
+    ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
+
+    _zkClient.createPersistent(HelixUtil.getMessagePath(clusterName, nodeId), true);
+    _zkClient.createPersistent(HelixUtil.getCurrentStateBasePath(clusterName, nodeId),
+                               true);
+    _zkClient.createPersistent(HelixUtil.getErrorsPath(clusterName, nodeId), true);
+    _zkClient.createPersistent(HelixUtil.getStatusUpdatesPath(clusterName, nodeId), true);
+  }
+
+  @Override
+  public void dropInstance(String clusterName, InstanceConfig instanceConfig)
+  {
+    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+    String instanceConfigsPath =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+    String nodeId = instanceConfig.getId();
+    String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
+    String instancePath = HelixUtil.getInstancePath(clusterName, nodeId);
+
+    if (!_zkClient.exists(instanceConfigPath))
+    {
+      throw new HelixException("Node " + nodeId
+          + " does not exist in config for cluster " + clusterName);
+    }
+
+    if (!_zkClient.exists(instancePath))
+    {
+      throw new HelixException("Node " + nodeId
+          + " does not exist in instances for cluster " + clusterName);
+    }
+
+    // delete config path
+    ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
+
+    // delete instance path
+    _zkClient.deleteRecursive(instancePath);
+  }
+
+  @Override
+  public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
+  {
+    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+
+    // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
+    String instanceConfigPath =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString(),
+                                   instanceName);
+    if (!_zkClient.exists(instanceConfigPath))
+    {
+      throw new HelixException("instance" + instanceName + " does not exist in cluster "
+          + clusterName);
+    }
+
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+  }
+
+  @Override
+  public void enableInstance(final String clusterName,
+                             final String instanceName,
+                             final boolean enabled)
+  {
+    String path =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString(),
+                                   instanceName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    if (!baseAccessor.exists(path, 0))
+    {
+      throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+          + ", instance config does not exist");
+    }
+
+    baseAccessor.update(path, new DataUpdater<ZNRecord>()
+    {
+      @Override
+      public ZNRecord update(ZNRecord currentData)
+      {
+        if (currentData == null)
+        {
+          throw new HelixException("Cluster: " + clusterName + ", instance: "
+              + instanceName + ", participant config is null");
+        }
+
+        InstanceConfig config = new InstanceConfig(currentData);
+        config.setInstanceEnabled(enabled);
+        return config.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public void enablePartition(final boolean enabled,
+                              final String clusterName,
+                              final String instanceName,
+                              final String resourceName,
+                              final List<String> partitionNames)
+  {
+    String path =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString(),
+                                   instanceName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+
+    // check instanceConfig exists
+    if (!baseAccessor.exists(path, 0))
+    {
+      throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
+          + ", instance config does not exist");
+    }
+
+    // check resource exists
+    String idealStatePath =
+        PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+
+    ZNRecord idealStateRecord = null;
+    try
+    {
+      idealStateRecord = baseAccessor.get(idealStatePath, null, 0);
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK.
+    }
+
+    if (idealStateRecord == null)
+    {
+      throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+          + ", ideal state does not exist");
+    }
+
+    // check partitions exist. warn if not
+    IdealState idealState = new IdealState(idealStateRecord);
+    for (String partitionName : partitionNames)
+    {
+      if ((idealState.getIdealStateMode() == IdealStateModeProperty.AUTO && idealState.getPreferenceList(partitionName) == null)
+          || (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED && idealState.getInstanceStateMap(partitionName) == null))
+      {
+        logger.warn("Cluster: " + clusterName + ", resource: " + resourceName
+            + ", partition: " + partitionName
+            + ", partition does not exist in ideal state");
+      }
+    }
+
+    // update participantConfig
+    // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
+    baseAccessor.update(path, new DataUpdater<ZNRecord>()
+    {
+      @Override
+      public ZNRecord update(ZNRecord currentData)
+      {
+        if (currentData == null)
+        {
+          throw new HelixException("Cluster: " + clusterName + ", instance: "
+              + instanceName + ", participant config is null");
+        }
+
+        // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
+        List<String> list =
+            currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+        Set<String> disabledPartitions = new HashSet<String>();
+        if (list != null)
+        {
+          disabledPartitions.addAll(list);
+        }
+
+        if (enabled)
+        {
+          disabledPartitions.removeAll(partitionNames);
+        }
+        else
+        {
+          disabledPartitions.addAll(partitionNames);
+        }
+
+        list = new ArrayList<String>(disabledPartitions);
+        Collections.sort(list);
+        currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
+                                 list);
+        return currentData;
+      }
+    },
+                        AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public void enableCluster(String clusterName, boolean enabled)
+  {
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    if (enabled)
+    {
+      accessor.removeProperty(keyBuilder.pause());
+    }
+    else
+    {
+      accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause"));
+    }
+  }
+
+  @Override
+  public void resetPartition(String clusterName,
+                             String instanceName,
+                             String resourceName,
+                             List<String> partitionNames)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    // check the instance is alive
+    LiveInstance liveInstance =
+        accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null)
+    {
+      throw new HelixException("Can't reset state for " + resourceName + "/"
+          + partitionNames + " on " + instanceName + ", because " + instanceName
+          + " is not alive");
+    }
+
+    // check resource group exists
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    if (idealState == null)
+    {
+      throw new HelixException("Can't reset state for " + resourceName + "/"
+          + partitionNames + " on " + instanceName + ", because " + resourceName
+          + " is not added");
+    }
+
+    // check partition exists in resource group
+    Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
+    if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+    {
+      Set<String> partitions =
+          new HashSet<String>(idealState.getRecord().getMapFields().keySet());
+      if (!partitions.containsAll(resetPartitionNames))
+      {
+        throw new HelixException("Can't reset state for " + resourceName + "/"
+            + partitionNames + " on " + instanceName + ", because not all "
+            + partitionNames + " exist");
+      }
+    }
+    else
+    {
+      Set<String> partitions =
+          new HashSet<String>(idealState.getRecord().getListFields().keySet());
+      if (!partitions.containsAll(resetPartitionNames))
+      {
+        throw new HelixException("Can't reset state for " + resourceName + "/"
+            + partitionNames + " on " + instanceName + ", because not all "
+            + partitionNames + " exist");
+      }
+    }
+
+    // check partition is in ERROR state
+    String sessionId = liveInstance.getSessionId();
+    CurrentState curState =
+        accessor.getProperty(keyBuilder.currentState(instanceName,
+                                                     sessionId,
+                                                     resourceName));
+    for (String partitionName : resetPartitionNames)
+    {
+      if (!curState.getState(partitionName).equals("ERROR"))
+      {
+        throw new HelixException("Can't reset state for " + resourceName + "/"
+            + partitionNames + " on " + instanceName + ", because not all "
+            + partitionNames + " are in ERROR state");
+      }
+    }
+
+    // check stateModelDef exists and get initial state
+    String stateModelDef = idealState.getStateModelDefRef();
+    StateModelDefinition stateModel =
+        accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
+    if (stateModel == null)
+    {
+      throw new HelixException("Can't reset state for " + resourceName + "/"
+          + partitionNames + " on " + instanceName + ", because " + stateModelDef
+          + " is NOT found");
+    }
+
+    // check there is no pending messages for the partitions exist
+    List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
+    for (Message message : messages)
+    {
+      if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
+          || !sessionId.equals(message.getTgtSessionId())
+          || !resourceName.equals(message.getResourceName())
+          || !resetPartitionNames.contains(message.getPartitionName()))
+      {
+        continue;
+      }
+
+      throw new HelixException("Can't reset state for " + resourceName + "/"
+          + partitionNames + " on " + instanceName
+          + ", because a pending message exists: " + message);
+    }
+
+    String adminName = null;
+    try
+    {
+      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+    }
+    catch (UnknownHostException e)
+    {
+      // can ignore it
+      logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+      adminName = "UNKNOWN";
+    }
+
+    List<Message> resetMessages = new ArrayList<Message>();
+    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
+    for (String partitionName : resetPartitionNames)
+    {
+      // send ERROR to initialState message
+      String msgId = UUID.randomUUID().toString();
+      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+      message.setSrcName(adminName);
+      message.setTgtName(instanceName);
+      message.setMsgState(MessageState.NEW);
+      message.setPartitionName(partitionName);
+      message.setResourceName(resourceName);
+      message.setTgtSessionId(sessionId);
+      message.setStateModelDef(stateModelDef);
+      message.setFromState("ERROR");
+      message.setToState(stateModel.getInitialState());
+      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+
+      resetMessages.add(message);
+      messageKeys.add(keyBuilder.message(instanceName, message.getId()));
+    }
+
+    accessor.setChildren(messageKeys, resetMessages);
+  }
+
+  @Override
+  public void resetInstance(String clusterName, List<String> instanceNames)
+  {
+    // TODO: not mp-safe
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
+
+    Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
+    for (String instanceName : resetInstanceNames)
+    {
+      List<String> resetPartitionNames = new ArrayList<String>();
+      for (ExternalView extView : extViews)
+      {
+        Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
+        for (String partitionName : stateMap.keySet())
+        {
+          Map<String, String> instanceStateMap = stateMap.get(partitionName);
+
+          if (instanceStateMap.containsKey(instanceName)
+              && instanceStateMap.get(instanceName).equals("ERROR"))
+          {
+            resetPartitionNames.add(partitionName);
+          }
+        }
+        resetPartition(clusterName,
+                       instanceName,
+                       extView.getResourceName(),
+                       resetPartitionNames);
+      }
+    }
+  }
+
+  @Override
+  public void resetResource(String clusterName, List<String> resourceNames)
+  {
+    // TODO: not mp-safe
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
+
+    Set<String> resetResourceNames = new HashSet<String>(resourceNames);
+    for (ExternalView extView : extViews)
+    {
+      if (!resetResourceNames.contains(extView.getResourceName()))
+      {
+        continue;
+      }
+
+      // instanceName -> list of resetPartitionNames
+      Map<String, List<String>> resetPartitionNames = new HashMap<String, List<String>>();
+
+      Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
+      for (String partitionName : stateMap.keySet())
+      {
+        Map<String, String> instanceStateMap = stateMap.get(partitionName);
+        for (String instanceName : instanceStateMap.keySet())
+        {
+          if (instanceStateMap.get(instanceName).equals("ERROR"))
+          {
+            if (!resetPartitionNames.containsKey(instanceName))
+            {
+              resetPartitionNames.put(instanceName, new ArrayList<String>());
+            }
+            resetPartitionNames.get(instanceName).add(partitionName);
+          }
+        }
+      }
+
+      for (String instanceName : resetPartitionNames.keySet())
+      {
+        resetPartition(clusterName,
+                       instanceName,
+                       extView.getResourceName(),
+                       resetPartitionNames.get(instanceName));
+      }
+    }
+  }
+
+  @Override
+  public void addCluster(String clusterName, boolean overwritePrevRecord)
+  {
+    String root = "/" + clusterName;
+    String path;
+
+    // TODO For ease of testing only, should remove later
+    if (_zkClient.exists(root))
+    {
+      logger.warn("Root directory exists.Cleaning the root directory:" + root
+          + " overwritePrevRecord: " + overwritePrevRecord);
+      if (overwritePrevRecord)
+      {
+        _zkClient.deleteRecursive(root);
+      }
+      else
+      {
+        throw new HelixException("Cluster " + clusterName + " already exists");
+      }
+    }
+
+    _zkClient.createPersistent(root);
+
+    // IDEAL STATE
+    _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
+    // CONFIGURATIONS
+    // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
+    path =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.CLUSTER.toString(),
+                                   clusterName);
+    _zkClient.createPersistent(path, true);
+    _zkClient.writeData(path, new ZNRecord(clusterName));
+    path =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.PARTICIPANT.toString());
+    _zkClient.createPersistent(path);
+    path =
+        PropertyPathConfig.getPath(PropertyType.CONFIGS,
+                                   clusterName,
+                                   ConfigScopeProperty.RESOURCE.toString());
+    _zkClient.createPersistent(path);
+    // PROPERTY STORE
+    path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+    _zkClient.createPersistent(path);
+    // LIVE INSTANCES
+    _zkClient.createPersistent(HelixUtil.getLiveInstancesPath(clusterName));
+    // MEMBER INSTANCES
+    _zkClient.createPersistent(HelixUtil.getMemberInstancesPath(clusterName));
+    // External view
+    _zkClient.createPersistent(HelixUtil.getExternalViewPath(clusterName));
+    // State model definition
+    _zkClient.createPersistent(HelixUtil.getStateModelDefinitionPath(clusterName));
+
+    // controller
+    _zkClient.createPersistent(HelixUtil.getControllerPath(clusterName));
+    path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
+    final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
+    final List<String> emptyList = new ArrayList<String>();
+    emptyHistory.setListField(clusterName, emptyList);
+    _zkClient.createPersistent(path, emptyHistory);
+
+    path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
+    _zkClient.createPersistent(path);
+
+    path = PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
+    _zkClient.createPersistent(path);
+
+    path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
+    _zkClient.createPersistent(path);
+  }
+
+  @Override
+  public List<String> getInstancesInCluster(String clusterName)
+  {
+    String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
+    return _zkClient.getChildren(memberInstancesPath);
+  }
+
+  @Override
+  public void addResource(String clusterName,
+                          String resourceName,
+                          int partitions,
+                          String stateModelRef)
+  {
+    addResource(clusterName,
+                resourceName,
+                partitions,
+                stateModelRef,
+                IdealStateModeProperty.AUTO.toString(),
+                0);
+  }
+
+  @Override
+  public void addResource(String clusterName,
+                          String resourceName,
+                          int partitions,
+                          String stateModelRef,
+                          String idealStateMode)
+  {
+    addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, 0);
+  }
+
+  @Override
+  public void addResource(String clusterName,
+                          String resourceName,
+                          int partitions,
+                          String stateModelRef,
+                          String idealStateMode,
+                          int bucketSize)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    IdealStateModeProperty mode = IdealStateModeProperty.AUTO;
+    try
+    {
+      mode = IdealStateModeProperty.valueOf(idealStateMode);
+    }
+    catch (Exception e)
+    {
+      logger.error("", e);
+    }
+    IdealState idealState = new IdealState(resourceName);
+    idealState.setNumPartitions(partitions);
+    idealState.setStateModelDefRef(stateModelRef);
+    idealState.setIdealStateMode(mode.toString());
+    idealState.setReplicas("" + 0);
+    idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+
+    if (bucketSize > 0)
+    {
+      idealState.setBucketSize(bucketSize);
+    }
+
+    String stateModelDefPath =
+        PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
+                                   clusterName,
+                                   stateModelRef);
+    if (!_zkClient.exists(stateModelDefPath))
+    {
+      throw new HelixException("State model " + stateModelRef
+          + " not found in the cluster STATEMODELDEFS path");
+    }
+
+    String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
+    String dbIdealStatePath = idealStatePath + "/" + resourceName;
+    if (_zkClient.exists(dbIdealStatePath))
+    {
+      throw new HelixException("Skip the operation. DB ideal state directory exists:"
+          + dbIdealStatePath);
+    }
+
+    ZKUtil.createChildren(_zkClient, idealStatePath, idealState.getRecord());
+  }
+
+  @Override
+  public List<String> getClusters()
+  {
+    List<String> zkToplevelPathes = _zkClient.getChildren("/");
+    List<String> result = new ArrayList<String>();
+    for (String pathName : zkToplevelPathes)
+    {
+      if (ZKUtil.isClusterSetup(pathName, _zkClient))
+      {
+        result.add(pathName);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getResourcesInCluster(String clusterName)
+  {
+    return _zkClient.getChildren(HelixUtil.getIdealStatePath(clusterName));
+  }
+
+  @Override
+  public IdealState getResourceIdealState(String clusterName, String dbName)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    return accessor.getProperty(keyBuilder.idealStates(dbName));
+  }
+
+  @Override
+  public void setResourceIdealState(String clusterName,
+                                    String dbName,
+                                    IdealState idealState)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    accessor.setProperty(keyBuilder.idealStates(dbName), idealState);
+  }
+
+  @Override
+  public ExternalView getResourceExternalView(String clusterName, String resourceName)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    return accessor.getProperty(keyBuilder.externalView(resourceName));
+  }
+
+  @Override
+  public void addStateModelDef(String clusterName,
+                               String stateModelDef,
+                               StateModelDefinition stateModel)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+    String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
+    String stateModelPath = stateModelDefPath + "/" + stateModelDef;
+    if (_zkClient.exists(stateModelPath))
+    {
+      logger.warn("Skip the operation.State Model directory exists:" + stateModelPath);
+      throw new HelixException("State model path " + stateModelPath + " already exists.");
+    }
+
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
+  }
+
+  @Override
+  public void dropResource(String clusterName, String resourceName)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    accessor.removeProperty(keyBuilder.idealStates(resourceName));
+  }
+
+  @Override
+  public List<String> getStateModelDefs(String clusterName)
+  {
+    return _zkClient.getChildren(HelixUtil.getStateModelDefinitionPath(clusterName));
+  }
+
+  @Override
+  public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
+  }
+
+  @Override
+  public void addStat(String clusterName, final String statName)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    String persistentStatsPath =
+        PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+
+    baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
+    {
+
+      @Override
+      public ZNRecord update(ZNRecord statsRec)
+      {
+        if (statsRec == null)
+        {
+          // TODO: fix naming of this record, if it matters
+          statsRec = new ZNRecord(PersistentStats.nodeName);
+        }
+
+        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
+        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
+        for (String newStat : newStatMap.keySet())
+        {
+          if (!currStatMap.containsKey(newStat))
+          {
+            currStatMap.put(newStat, newStatMap.get(newStat));
+          }
+        }
+        statsRec.setMapFields(currStatMap);
+
+        return statsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public void addAlert(final String clusterName, final String alertName)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+
+    String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
+
+    baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
+    {
+
+      @Override
+      public ZNRecord update(ZNRecord alertsRec)
+      {
+        if (alertsRec == null)
+        {
+          // TODO: fix naming of this record, if it matters
+          alertsRec = new ZNRecord(Alerts.nodeName);
+
+        }
+
+        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
+        StringBuilder newStatName = new StringBuilder();
+        Map<String, String> newAlertMap = new HashMap<String, String>();
+
+        // use AlertsHolder to get map of new stats and map for this alert
+        AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
+
+        // add stat
+        addStat(clusterName, newStatName.toString());
+
+        // add alert
+        currAlertMap.put(alertName, newAlertMap);
+
+        alertsRec.setMapFields(currAlertMap);
+
+        return alertsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public void dropCluster(String clusterName)
+  {
+    logger.info("Deleting cluster " + clusterName);
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    String root = "/" + clusterName;
+    if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0)
+    {
+      throw new HelixException("There are still live instances in the cluster, shut them down first.");
+    }
+
+    if (accessor.getProperty(keyBuilder.controllerLeader()) != null)
+    {
+      throw new HelixException("There are still LEADER in the cluster, shut them down first.");
+    }
+
+    _zkClient.deleteRecursive(root);
+  }
+
+  @Override
+  public void dropStat(String clusterName, final String statName)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    String persistentStatsPath =
+        PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+
+    baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
+    {
+
+      @Override
+      public ZNRecord update(ZNRecord statsRec)
+      {
+        if (statsRec == null)
+        {
+          throw new HelixException("No stats record in ZK, nothing to drop");
+        }
+
+        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
+        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
+
+        // delete each stat from stat map
+        for (String newStat : newStatMap.keySet())
+        {
+          if (currStatMap.containsKey(newStat))
+          {
+            currStatMap.remove(newStat);
+          }
+        }
+        statsRec.setMapFields(currStatMap);
+
+        return statsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public void dropAlert(String clusterName, final String alertName)
+  {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+
+    if (!baseAccessor.exists(alertsPath, 0))
+    {
+      throw new HelixException("No alerts node in ZK, nothing to drop");
+    }
+
+    baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
+    {
+      @Override
+      public ZNRecord update(ZNRecord alertsRec)
+      {
+        if (alertsRec == null)
+        {
+          throw new HelixException("No alerts record in ZK, nothing to drop");
+        }
+
+        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
+        currAlertMap.remove(alertName);
+        alertsRec.setMapFields(currAlertMap);
+
+        return alertsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public void addClusterToGrandCluster(String clusterName, String grandCluster)
+  {
+    if (!ZKUtil.isClusterSetup(grandCluster, _zkClient))
+    {
+      throw new HelixException("Grand cluster " + grandCluster + " is not setup yet");
+    }
+
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
+    {
+      throw new HelixException("Cluster " + clusterName + " is not setup yet");
+    }
+
+    IdealState idealState = new IdealState(clusterName);
+
+    idealState.setNumPartitions(1);
+    idealState.setStateModelDefRef("LeaderStandby");
+
+    List<String> controllers = getInstancesInCluster(grandCluster);
+    if (controllers.size() == 0)
+    {
+      throw new HelixException("Grand cluster " + grandCluster + " has no instances");
+    }
+    idealState.setReplicas(Integer.toString(controllers.size()));
+    Collections.shuffle(controllers);
+    idealState.getRecord().setListField(clusterName, controllers);
+    idealState.setPartitionState(clusterName, controllers.get(0), "LEADER");
+    for (int i = 1; i < controllers.size(); i++)
+    {
+      idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY");
+    }
+
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
+  }
+
+  @Override
+  public void setConfig(ConfigScope scope, Map<String, String> properties)
+  {
+    for (String key : properties.keySet())
+    {
+      _configAccessor.set(scope, key, properties.get(key));
+    }
+  }
+
+  @Override
+  public Map<String, String> getConfig(ConfigScope scope, Set<String> keys)
+  {
+    Map<String, String> properties = new TreeMap<String, String>();
+
+    if (keys == null)
+    {
+      // read all simple fields
+
+    }
+    else
+    {
+      for (String key : keys)
+      {
+        String value = _configAccessor.get(scope, key);
+        if (value == null)
+        {
+          logger.error("Config doesn't exist for key: " + key);
+          continue;
+        }
+        properties.put(key, value);
+      }
+    }
+
+    return properties;
+  }
+
+  @Override
+  public List<String> getConfigKeys(ConfigScopeProperty scope,
+                                    String clusterName,
+                                    String... keys)
+  {
+    return _configAccessor.getKeys(scope, clusterName, keys);
+  }
+
+  @Override
+  public void removeConfig(ConfigScope scope, Set<String> keys)
+  {
+    for (String key : keys)
+    {
+      _configAccessor.remove(scope, key);
+    }
+  }
+
+  @Override
+  public void rebalance(String clusterName, String resourceName, int replica)
+  {
+    rebalance(clusterName, resourceName, replica, resourceName);
+  }
+
+  void rebalance(String clusterName, String resourceName, int replica, String keyPrefix)
+  {
+    List<String> InstanceNames = getInstancesInCluster(clusterName);
+
+    // ensure we get the same idealState with the same set of instances
+    Collections.sort(InstanceNames);
+
+    IdealState idealState = getResourceIdealState(clusterName, resourceName);
+    if (idealState == null)
+    {
+      throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
+    }
+
+    idealState.setReplicas(Integer.toString(replica));
+    int partitions = idealState.getNumPartitions();
+    String stateModelName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
+
+    if (stateModDef == null)
+    {
+      throw new HelixException("cannot find state model: " + stateModelName);
+    }
+    // StateModelDefinition def = new StateModelDefinition(stateModDef);
+
+    List<String> statePriorityList = stateModDef.getStatesPriorityList();
+
+    String masterStateValue = null;
+    String slaveStateValue = null;
+    replica--;
+
+    for (String state : statePriorityList)
+    {
+      String count = stateModDef.getNumInstancesPerState(state);
+      if (count.equals("1"))
+      {
+        if (masterStateValue != null)
+        {
+          throw new HelixException("Invalid or unsupported state model definition");
+        }
+        masterStateValue = state;
+      }
+      else if (count.equalsIgnoreCase("R"))
+      {
+        if (slaveStateValue != null)
+        {
+          throw new HelixException("Invalid or unsupported state model definition");
+        }
+        slaveStateValue = state;
+      }
+      else if (count.equalsIgnoreCase("N"))
+      {
+        if (!(masterStateValue == null && slaveStateValue == null))
+        {
+          throw new HelixException("Invalid or unsupported state model definition");
+        }
+        replica = InstanceNames.size() - 1;
+        masterStateValue = slaveStateValue = state;
+      }
+    }
+    if (masterStateValue == null && slaveStateValue == null)
+    {
+      throw new HelixException("Invalid or unsupported state model definition");
+    }
+
+    if (masterStateValue == null)
+    {
+      masterStateValue = slaveStateValue;
+    }
+    if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
+    {
+      ZNRecord newIdealState =
+          IdealStateCalculatorForStorageNode.calculateIdealState(InstanceNames,
+                                                                 partitions,
+                                                                 replica,
+                                                                 keyPrefix,
+                                                                 masterStateValue,
+                                                                 slaveStateValue);
+
+      // for now keep mapField in AUTO mode and remove listField in CUSTOMIZED mode
+      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
+      {
+        idealState.getRecord().setListFields(newIdealState.getListFields());
+        idealState.getRecord().setMapFields(newIdealState.getMapFields());
+      }
+      if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+      {
+        idealState.getRecord().setMapFields(newIdealState.getMapFields());
+      }
+    }
+    else
+    {
+      for (int i = 0; i < partitions; i++)
+      {
+        String partitionName = keyPrefix + "_" + i;
+        idealState.getRecord().setMapField(partitionName, new HashMap<String, String>());
+        idealState.getRecord().setListField(partitionName, new ArrayList<String>());
+      }
+    }
+    setResourceIdealState(clusterName, resourceName, idealState);
+  }
+
+  @Override
+  public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
+  {
+    ZNRecord idealStateRecord =
+        (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile)));
+    if (idealStateRecord.getId() == null
+        || !idealStateRecord.getId().equals(resourceName))
+    {
+      throw new IllegalArgumentException("ideal state must have same id as resource name");
+    }
+    setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
+  }
+
+  private static byte[] readFile(String filePath) throws IOException
+  {
+    File file = new File(filePath);
+
+    int size = (int) file.length();
+    byte[] bytes = new byte[size];
+    DataInputStream dis = new DataInputStream(new FileInputStream(file));
+    int read = 0;
+    int numRead = 0;
+    while (read < bytes.length
+        && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0)
+    {
+      read = read + numRead;
+    }
+    return bytes;
+  }
+
+  public void addStateModelDef(String clusterName,
+                               String stateModelDefName,
+                               String stateModelDefFile) throws IOException
+  {
+    ZNRecord record =
+        (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
+    if (record == null || record.getId() == null
+        || !record.getId().equals(stateModelDefName))
+    {
+      throw new IllegalArgumentException("state model definition must have same id as state model def name");
+    }
+    addStateModelDef(clusterName, stateModelDefName, new StateModelDefinition(record));
+
+  }
+
+  public void addMessageConstraint(String clusterName,
+                                   final String constraintId,
+                                   final Map<String, String> constraints)
+  {
+    ZkBaseDataAccessor<ZNRecord> baseAccessor =
+        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+
+    Builder keyBuilder = new Builder(clusterName);
+    String path = keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()).getPath();
+
+    baseAccessor.update(path, new DataUpdater<ZNRecord>()
+    {
+      @Override
+      public ZNRecord update(ZNRecord currentData)
+      {
+        if (currentData == null)
+        {
+          currentData = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
+        }
+
+        Map<String, String> map = currentData.getMapField(constraintId);
+        if (map == null)
+        {
+          map = new TreeMap<String, String>();
+          currentData.setMapField(constraintId, map);
+        } else
+        {
+          logger.warn("Overwrite existing constraint " + constraintId + ": " + map);
+        }
+
+        for (String key : constraints.keySet())
+        {
+          // make sure contraint attribute is valid
+          ConstraintAttribute attr = ConstraintAttribute.valueOf(key.toUpperCase());
+
+          map.put(attr.toString(), constraints.get(key));
+        }
+        
+        return currentData;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
new file mode 100644
index 0000000..0e13126
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -0,0 +1,590 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.GroupCommit;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordAssembler;
+import org.apache.helix.ZNRecordBucketizer;
+import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.restlet.ZNRecordUpdate;
+import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener
+{
+  private static Logger                    LOG                       =
+                                                                         Logger.getLogger(ZKHelixDataAccessor.class);
+  private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
+  final InstanceType                       _instanceType;
+  private final String                     _clusterName;
+  private final Builder                    _propertyKeyBuilder;
+  ZkPropertyTransferClient                 _zkPropertyTransferClient = null;
+  private final GroupCommit                _groupCommit              = new GroupCommit();
+  String                                   _zkPropertyTransferSvcUrl = null;
+
+  public ZKHelixDataAccessor(String clusterName,
+                             BaseDataAccessor<ZNRecord> baseDataAccessor)
+  {
+    this(clusterName, null, baseDataAccessor);
+  }
+
+  public ZKHelixDataAccessor(String clusterName,
+                             InstanceType instanceType,
+                             BaseDataAccessor<ZNRecord> baseDataAccessor)
+  {
+    _clusterName = clusterName;
+    _instanceType = instanceType;
+    _baseDataAccessor = baseDataAccessor;
+    _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value)
+  {
+    PropertyType type = key.getType();
+    String path = key.getPath();
+    int options = constructOptions(type);
+    return _baseDataAccessor.create(path, value.getRecord(), options);
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
+  {
+    PropertyType type = key.getType();
+    if (!value.isValid())
+    {
+      throw new HelixException("The ZNRecord for " + type + " is not valid.");
+    }
+
+    String path = key.getPath();
+    int options = constructOptions(type);
+
+    if (type.usePropertyTransferServer())
+    {
+      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
+      {
+        ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
+        _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
+        return true;
+      }
+    }
+
+    boolean success = false;
+    switch (type)
+    {
+    case IDEALSTATES:
+    case EXTERNALVIEW:
+      // check if bucketized
+      if (value.getBucketSize() > 0)
+      {
+        // set parent node
+        ZNRecord metaRecord = new ZNRecord(value.getId());
+        metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
+        success = _baseDataAccessor.set(path, metaRecord, options);
+        if (success)
+        {
+          ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
+
+          Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
+          List<String> paths = new ArrayList<String>();
+          List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
+          for (String bucketName : map.keySet())
+          {
+            paths.add(path + "/" + bucketName);
+            bucketizedRecords.add(map.get(bucketName));
+          }
+
+          // TODO: set success accordingly
+          _baseDataAccessor.setChildren(paths, bucketizedRecords, options);
+        }
+      }
+      else
+      {
+        success = _baseDataAccessor.set(path, value.getRecord(), options);
+      }
+      break;
+    default:
+      success = _baseDataAccessor.set(path, value.getRecord(), options);
+      break;
+    }
+    return success;
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value)
+  {
+    PropertyType type = key.getType();
+    String path = key.getPath();
+    int options = constructOptions(type);
+
+    boolean success = false;
+    switch (type)
+    {
+    case CURRENTSTATES:
+      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
+      break;
+    default:
+      if (type.usePropertyTransferServer())
+      {
+        if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
+        {
+          ZNRecordUpdate update =
+              new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
+          _zkPropertyTransferClient.enqueueZNRecordUpdate(update,
+                                                          _zkPropertyTransferSvcUrl);
+
+          return true;
+        }
+        else
+        {
+          LOG.debug("getPropertyTransferUrl is null, skip updating the value");
+          // TODO: consider skip the write operation
+          return true;
+        }
+      }
+      success =
+          _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
+      break;
+    }
+    return success;
+  }
+
+  @Override
+  public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
+  {
+    if (keys == null || keys.size() == 0)
+    {
+      return Collections.emptyList();
+    }
+
+    List<T> childValues = new ArrayList<T>();
+
+    // read all records
+    List<String> paths = new ArrayList<String>();
+    for (PropertyKey key : keys)
+    {
+      paths.add(key.getPath());
+    }
+    List<ZNRecord> children = _baseDataAccessor.get(paths, null, 0);
+
+    // check if bucketized
+    for (int i = 0; i < keys.size(); i++)
+    {
+      PropertyKey key = keys.get(i);
+      ZNRecord record = children.get(i);
+
+      PropertyType type = key.getType();
+      String path = key.getPath();
+      int options = constructOptions(type);
+      // ZNRecord record = null;
+
+      switch (type)
+      {
+      case CURRENTSTATES:
+      case IDEALSTATES:
+      case EXTERNALVIEW:
+        // check if bucketized
+        if (record != null)
+        {
+          HelixProperty property = new HelixProperty(record);
+
+          int bucketSize = property.getBucketSize();
+          if (bucketSize > 0)
+          {
+            List<ZNRecord> childRecords =
+                _baseDataAccessor.getChildren(path, null, options);
+            ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
+
+            // merge with parent node value
+            if (assembledRecord != null)
+            {
+              record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
+              record.getListFields().putAll(assembledRecord.getListFields());
+              record.getMapFields().putAll(assembledRecord.getMapFields());
+            }
+          }
+        }
+        break;
+      default:
+        break;
+      }
+
+      @SuppressWarnings("unchecked")
+      T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
+      childValues.add(t);
+    }
+
+    return childValues;
+  }
+
+  @Override
+  public <T extends HelixProperty> T getProperty(PropertyKey key)
+  {
+    PropertyType type = key.getType();
+    String path = key.getPath();
+    int options = constructOptions(type);
+    ZNRecord record = null;
+    try
+    {
+      Stat stat = new Stat();
+      record = _baseDataAccessor.get(path, stat, options);
+      if (record != null)
+      {
+        record.setCreationTime(stat.getCtime());
+        record.setModifiedTime(stat.getMtime());
+      }
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK
+    }
+
+    switch (type)
+    {
+    case CURRENTSTATES:
+    case IDEALSTATES:
+    case EXTERNALVIEW:
+      // check if bucketized
+      if (record != null)
+      {
+        HelixProperty property = new HelixProperty(record);
+
+        int bucketSize = property.getBucketSize();
+        if (bucketSize > 0)
+        {
+          List<ZNRecord> childRecords =
+              _baseDataAccessor.getChildren(path, null, options);
+          ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
+
+          // merge with parent node value
+          if (assembledRecord != null)
+          {
+            record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
+            record.getListFields().putAll(assembledRecord.getListFields());
+            record.getMapFields().putAll(assembledRecord.getMapFields());
+          }
+        }
+      }
+      break;
+    default:
+      break;
+    }
+
+    @SuppressWarnings("unchecked")
+    T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
+    return t;
+  }
+
+  @Override
+  public boolean removeProperty(PropertyKey key)
+  {
+    PropertyType type = key.getType();
+    String path = key.getPath();
+    int options = constructOptions(type);
+
+    return _baseDataAccessor.remove(path, options);
+  }
+
+  @Override
+  public List<String> getChildNames(PropertyKey key)
+  {
+    PropertyType type = key.getType();
+    String parentPath = key.getPath();
+    int options = constructOptions(type);
+    List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
+    if (childNames == null)
+    {
+      childNames = Collections.emptyList();
+    }
+    return childNames;
+  }
+
+  @Override
+  public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+  {
+    PropertyType type = key.getType();
+    String parentPath = key.getPath();
+    int options = constructOptions(type);
+    List<T> childValues = new ArrayList<T>();
+
+    List<ZNRecord> children = _baseDataAccessor.getChildren(parentPath, null, options);
+    if (children != null)
+    {
+      for (ZNRecord record : children)
+      {
+        switch (type)
+        {
+        case CURRENTSTATES:
+        case IDEALSTATES:
+        case EXTERNALVIEW:
+          if (record != null)
+          {
+            HelixProperty property = new HelixProperty(record);
+
+            int bucketSize = property.getBucketSize();
+            if (bucketSize > 0)
+            {
+              // TODO: fix this if record.id != pathName
+              String childPath = parentPath + "/" + record.getId();
+              List<ZNRecord> childRecords =
+                  _baseDataAccessor.getChildren(childPath, null, options);
+              ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
+
+              // merge with parent node value
+              if (assembledRecord != null)
+              {
+                record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
+                record.getListFields().putAll(assembledRecord.getListFields());
+                record.getMapFields().putAll(assembledRecord.getMapFields());
+              }
+            }
+          }
+
+          break;
+        default:
+          break;
+        }
+
+        if (record != null)
+        {
+          @SuppressWarnings("unchecked")
+          T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
+          childValues.add(t);
+        }
+      }
+    }
+    return childValues;
+  }
+
+  @Override
+  public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key)
+  {
+    PropertyType type = key.getType();
+    String parentPath = key.getPath();
+    int options = constructOptions(type);
+    List<T> children = getChildValues(key);
+    Map<String, T> childValuesMap = new HashMap<String, T>();
+    for (T t : children)
+    {
+      childValuesMap.put(t.getRecord().getId(), t);
+    }
+    return childValuesMap;
+
+  }
+
+  @Override
+  public Builder keyBuilder()
+  {
+    return _propertyKeyBuilder;
+  }
+
+  private int constructOptions(PropertyType type)
+  {
+    int options = 0;
+    if (type.isPersistent())
+    {
+      options = options | AccessOption.PERSISTENT;
+    }
+    else
+    {
+      options = options | AccessOption.EPHEMERAL;
+    }
+
+    // if (type == PropertyType.CURRENTSTATES && _instanceType ==
+    // InstanceType.PARTICIPANT)
+    // {
+    // options = options | BaseDataAccessor.Option.WRITE_THROUGH;
+    // }
+    // else if (type == PropertyType.EXTERNALVIEW
+    // && _instanceType == InstanceType.CONTROLLER)
+    // {
+    // options = options | BaseDataAccessor.Option.WRITE_THROUGH;
+    // }
+
+    return options;
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
+                                                            List<T> children)
+  {
+    // TODO: add validation
+    int options = -1;
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    for (int i = 0; i < keys.size(); i++)
+    {
+      PropertyKey key = keys.get(i);
+      PropertyType type = key.getType();
+      String path = key.getPath();
+      paths.add(path);
+      HelixProperty value = children.get(i);
+      records.add(value.getRecord());
+      options = constructOptions(type);
+    }
+    return _baseDataAccessor.createChildren(paths, records, options);
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys,
+                                                         List<T> children)
+  {
+    int options = -1;
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+
+    List<List<String>> bucketizedPaths =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
+    List<List<ZNRecord>> bucketizedRecords =
+        new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(),
+                                                                           null));
+
+    for (int i = 0; i < keys.size(); i++)
+    {
+      PropertyKey key = keys.get(i);
+      PropertyType type = key.getType();
+      String path = key.getPath();
+      paths.add(path);
+      options = constructOptions(type);
+
+      HelixProperty value = children.get(i);
+
+      switch (type)
+      {
+      case EXTERNALVIEW:
+        if (value.getBucketSize() == 0)
+        {
+          records.add(value.getRecord());
+        }
+        else
+        {
+          ZNRecord metaRecord = new ZNRecord(value.getId());
+          metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
+          records.add(metaRecord);
+
+          ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
+
+          Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
+          List<String> childBucketizedPaths = new ArrayList<String>();
+          List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
+          for (String bucketName : map.keySet())
+          {
+            childBucketizedPaths.add(path + "/" + bucketName);
+            childBucketizedRecords.add(map.get(bucketName));
+          }
+          bucketizedPaths.set(i, childBucketizedPaths);
+          bucketizedRecords.set(i, childBucketizedRecords);
+        }
+        break;
+      default:
+        records.add(value.getRecord());
+        break;
+      }
+    }
+
+    // set non-bucketized nodes or parent nodes of bucketized nodes
+    boolean success[] = _baseDataAccessor.setChildren(paths, records, options);
+
+    // set bucketized nodes
+    List<String> allBucketizedPaths = new ArrayList<String>();
+    List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();
+
+    for (int i = 0; i < keys.size(); i++)
+    {
+      if (success[i] && bucketizedPaths.get(i) != null)
+      {
+        allBucketizedPaths.addAll(bucketizedPaths.get(i));
+        allBucketizedRecords.addAll(bucketizedRecords.get(i));
+      }
+    }
+
+    // TODO: set success accordingly
+    _baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);
+
+    return success;
+  }
+
+  @Override
+  public BaseDataAccessor<ZNRecord> getBaseDataAccessor()
+  {
+    return _baseDataAccessor;
+  }
+
+  @Override
+  public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
+                                                            List<DataUpdater<ZNRecord>> updaters,
+                                                            int options)
+  {
+    return _baseDataAccessor.updateChildren(paths, updaters, options);
+  }
+
+  public void shutdown()
+  {
+    if (_zkPropertyTransferClient != null)
+    {
+      _zkPropertyTransferClient.shutdown();
+    }
+  }
+
+  @Override
+  public void onControllerChange(NotificationContext changeContext)
+  {
+    LOG.info("Controller has changed");
+    refreshZkPropertyTransferUrl();
+    if (_zkPropertyTransferClient == null)
+    {
+      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0)
+      {
+        LOG.info("Creating ZkPropertyTransferClient as we get url "
+            + _zkPropertyTransferSvcUrl);
+        _zkPropertyTransferClient =
+            new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
+      }
+    }
+  }
+
+  void refreshZkPropertyTransferUrl()
+  {
+    try
+    {
+      LiveInstance leader = getProperty(keyBuilder().controllerLeader());
+      if (leader != null)
+      {
+        _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
+        LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl
+            + " Controller " + leader.getInstanceName());
+      }
+      else
+      {
+        _zkPropertyTransferSvcUrl = null;
+      }
+    }
+    catch (Exception e)
+    {
+      // LOG.error("", e);
+      _zkPropertyTransferSvcUrl = null;
+    }
+  }
+}