You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[16/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java
deleted file mode 100644
index d3192b5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java
+++ /dev/null
@@ -1,1272 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixConstants;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.alerts.AlertsHolder;
-import com.linkedin.helix.alerts.StatsHolder;
-import com.linkedin.helix.model.Alerts;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintAttribute;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.InstanceConfig.InstanceConfigProperty;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.PauseSignal;
-import com.linkedin.helix.model.PersistentStats;
-import com.linkedin.helix.model.StateModelDefinition;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-import com.linkedin.helix.util.HelixUtil;
-
-public class ZKHelixAdmin implements HelixAdmin
-{
-
-  private final ZkClient _zkClient;
-  private final ConfigAccessor _configAccessor;
-
-  private static Logger logger = Logger.getLogger(ZKHelixAdmin.class);
-
-  public ZKHelixAdmin(ZkClient zkClient)
-  {
-    _zkClient = zkClient;
-    _configAccessor = new ConfigAccessor(zkClient);
-  }
-
-  @Override
-  public void addInstance(String clusterName, InstanceConfig instanceConfig)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-    String instanceConfigsPath =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString());
-    String nodeId = instanceConfig.getId();
-    String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
-
-    if (_zkClient.exists(instanceConfigPath))
-    {
-      throw new HelixException("Node " + nodeId + " already exists in cluster "
-          + clusterName);
-    }
-
-    ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
-
-    _zkClient.createPersistent(HelixUtil.getMessagePath(clusterName, nodeId), true);
-    _zkClient.createPersistent(HelixUtil.getCurrentStateBasePath(clusterName, nodeId),
-                               true);
-    _zkClient.createPersistent(HelixUtil.getErrorsPath(clusterName, nodeId), true);
-    _zkClient.createPersistent(HelixUtil.getStatusUpdatesPath(clusterName, nodeId), true);
-  }
-
-  @Override
-  public void dropInstance(String clusterName, InstanceConfig instanceConfig)
-  {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-    String instanceConfigsPath =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString());
-    String nodeId = instanceConfig.getId();
-    String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
-    String instancePath = HelixUtil.getInstancePath(clusterName, nodeId);
-
-    if (!_zkClient.exists(instanceConfigPath))
-    {
-      throw new HelixException("Node " + nodeId
-          + " does not exist in config for cluster " + clusterName);
-    }
-
-    if (!_zkClient.exists(instancePath))
-    {
-      throw new HelixException("Node " + nodeId
-          + " does not exist in instances for cluster " + clusterName);
-    }
-
-    // delete config path
-    ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
-
-    // delete instance path
-    _zkClient.deleteRecursive(instancePath);
-  }
-
-  @Override
-  public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
-  {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
-    // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
-    String instanceConfigPath =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString(),
-                                   instanceName);
-    if (!_zkClient.exists(instanceConfigPath))
-    {
-      throw new HelixException("instance" + instanceName + " does not exist in cluster "
-          + clusterName);
-    }
-
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
-  }
-
-  @Override
-  public void enableInstance(final String clusterName,
-                             final String instanceName,
-                             final boolean enabled)
-  {
-    String path =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString(),
-                                   instanceName);
-
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-    if (!baseAccessor.exists(path, 0))
-    {
-      throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
-          + ", instance config does not exist");
-    }
-
-    baseAccessor.update(path, new DataUpdater<ZNRecord>()
-    {
-      @Override
-      public ZNRecord update(ZNRecord currentData)
-      {
-        if (currentData == null)
-        {
-          throw new HelixException("Cluster: " + clusterName + ", instance: "
-              + instanceName + ", participant config is null");
-        }
-
-        InstanceConfig config = new InstanceConfig(currentData);
-        config.setInstanceEnabled(enabled);
-        return config.getRecord();
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  @Override
-  public void enablePartition(final boolean enabled,
-                              final String clusterName,
-                              final String instanceName,
-                              final String resourceName,
-                              final List<String> partitionNames)
-  {
-    String path =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString(),
-                                   instanceName);
-
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
-    // check instanceConfig exists
-    if (!baseAccessor.exists(path, 0))
-    {
-      throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
-          + ", instance config does not exist");
-    }
-
-    // check resource exists
-    String idealStatePath =
-        PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
-
-    ZNRecord idealStateRecord = null;
-    try
-    {
-      idealStateRecord = baseAccessor.get(idealStatePath, null, 0);
-    }
-    catch (ZkNoNodeException e)
-    {
-      // OK.
-    }
-
-    if (idealStateRecord == null)
-    {
-      throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
-          + ", ideal state does not exist");
-    }
-
-    // check partitions exist. warn if not
-    IdealState idealState = new IdealState(idealStateRecord);
-    for (String partitionName : partitionNames)
-    {
-      if ((idealState.getIdealStateMode() == IdealStateModeProperty.AUTO && idealState.getPreferenceList(partitionName) == null)
-          || (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED && idealState.getInstanceStateMap(partitionName) == null))
-      {
-        logger.warn("Cluster: " + clusterName + ", resource: " + resourceName
-            + ", partition: " + partitionName
-            + ", partition does not exist in ideal state");
-      }
-    }
-
-    // update participantConfig
-    // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
-    baseAccessor.update(path, new DataUpdater<ZNRecord>()
-    {
-      @Override
-      public ZNRecord update(ZNRecord currentData)
-      {
-        if (currentData == null)
-        {
-          throw new HelixException("Cluster: " + clusterName + ", instance: "
-              + instanceName + ", participant config is null");
-        }
-
-        // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
-        List<String> list =
-            currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
-        Set<String> disabledPartitions = new HashSet<String>();
-        if (list != null)
-        {
-          disabledPartitions.addAll(list);
-        }
-
-        if (enabled)
-        {
-          disabledPartitions.removeAll(partitionNames);
-        }
-        else
-        {
-          disabledPartitions.addAll(partitionNames);
-        }
-
-        list = new ArrayList<String>(disabledPartitions);
-        Collections.sort(list);
-        currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
-                                 list);
-        return currentData;
-      }
-    },
-                        AccessOption.PERSISTENT);
-  }
-
-  @Override
-  public void enableCluster(String clusterName, boolean enabled)
-  {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    if (enabled)
-    {
-      accessor.removeProperty(keyBuilder.pause());
-    }
-    else
-    {
-      accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause"));
-    }
-  }
-
-  @Override
-  public void resetPartition(String clusterName,
-                             String instanceName,
-                             String resourceName,
-                             List<String> partitionNames)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    // check the instance is alive
-    LiveInstance liveInstance =
-        accessor.getProperty(keyBuilder.liveInstance(instanceName));
-    if (liveInstance == null)
-    {
-      throw new HelixException("Can't reset state for " + resourceName + "/"
-          + partitionNames + " on " + instanceName + ", because " + instanceName
-          + " is not alive");
-    }
-
-    // check resource group exists
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
-    if (idealState == null)
-    {
-      throw new HelixException("Can't reset state for " + resourceName + "/"
-          + partitionNames + " on " + instanceName + ", because " + resourceName
-          + " is not added");
-    }
-
-    // check partition exists in resource group
-    Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
-    if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
-    {
-      Set<String> partitions =
-          new HashSet<String>(idealState.getRecord().getMapFields().keySet());
-      if (!partitions.containsAll(resetPartitionNames))
-      {
-        throw new HelixException("Can't reset state for " + resourceName + "/"
-            + partitionNames + " on " + instanceName + ", because not all "
-            + partitionNames + " exist");
-      }
-    }
-    else
-    {
-      Set<String> partitions =
-          new HashSet<String>(idealState.getRecord().getListFields().keySet());
-      if (!partitions.containsAll(resetPartitionNames))
-      {
-        throw new HelixException("Can't reset state for " + resourceName + "/"
-            + partitionNames + " on " + instanceName + ", because not all "
-            + partitionNames + " exist");
-      }
-    }
-
-    // check partition is in ERROR state
-    String sessionId = liveInstance.getSessionId();
-    CurrentState curState =
-        accessor.getProperty(keyBuilder.currentState(instanceName,
-                                                     sessionId,
-                                                     resourceName));
-    for (String partitionName : resetPartitionNames)
-    {
-      if (!curState.getState(partitionName).equals("ERROR"))
-      {
-        throw new HelixException("Can't reset state for " + resourceName + "/"
-            + partitionNames + " on " + instanceName + ", because not all "
-            + partitionNames + " are in ERROR state");
-      }
-    }
-
-    // check stateModelDef exists and get initial state
-    String stateModelDef = idealState.getStateModelDefRef();
-    StateModelDefinition stateModel =
-        accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
-    if (stateModel == null)
-    {
-      throw new HelixException("Can't reset state for " + resourceName + "/"
-          + partitionNames + " on " + instanceName + ", because " + stateModelDef
-          + " is NOT found");
-    }
-
-    // check there is no pending messages for the partitions exist
-    List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
-    for (Message message : messages)
-    {
-      if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
-          || !sessionId.equals(message.getTgtSessionId())
-          || !resourceName.equals(message.getResourceName())
-          || !resetPartitionNames.contains(message.getPartitionName()))
-      {
-        continue;
-      }
-
-      throw new HelixException("Can't reset state for " + resourceName + "/"
-          + partitionNames + " on " + instanceName
-          + ", because a pending message exists: " + message);
-    }
-
-    String adminName = null;
-    try
-    {
-      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
-    }
-    catch (UnknownHostException e)
-    {
-      // can ignore it
-      logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
-      adminName = "UNKNOWN";
-    }
-
-    List<Message> resetMessages = new ArrayList<Message>();
-    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
-    for (String partitionName : resetPartitionNames)
-    {
-      // send ERROR to initialState message
-      String msgId = UUID.randomUUID().toString();
-      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
-      message.setSrcName(adminName);
-      message.setTgtName(instanceName);
-      message.setMsgState(MessageState.NEW);
-      message.setPartitionName(partitionName);
-      message.setResourceName(resourceName);
-      message.setTgtSessionId(sessionId);
-      message.setStateModelDef(stateModelDef);
-      message.setFromState("ERROR");
-      message.setToState(stateModel.getInitialState());
-      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
-
-      resetMessages.add(message);
-      messageKeys.add(keyBuilder.message(instanceName, message.getId()));
-    }
-
-    accessor.setChildren(messageKeys, resetMessages);
-  }
-
-  @Override
-  public void resetInstance(String clusterName, List<String> instanceNames)
-  {
-    // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
-
-    Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
-    for (String instanceName : resetInstanceNames)
-    {
-      List<String> resetPartitionNames = new ArrayList<String>();
-      for (ExternalView extView : extViews)
-      {
-        Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
-        for (String partitionName : stateMap.keySet())
-        {
-          Map<String, String> instanceStateMap = stateMap.get(partitionName);
-
-          if (instanceStateMap.containsKey(instanceName)
-              && instanceStateMap.get(instanceName).equals("ERROR"))
-          {
-            resetPartitionNames.add(partitionName);
-          }
-        }
-        resetPartition(clusterName,
-                       instanceName,
-                       extView.getResourceName(),
-                       resetPartitionNames);
-      }
-    }
-  }
-
-  @Override
-  public void resetResource(String clusterName, List<String> resourceNames)
-  {
-    // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
-
-    Set<String> resetResourceNames = new HashSet<String>(resourceNames);
-    for (ExternalView extView : extViews)
-    {
-      if (!resetResourceNames.contains(extView.getResourceName()))
-      {
-        continue;
-      }
-
-      // instanceName -> list of resetPartitionNames
-      Map<String, List<String>> resetPartitionNames = new HashMap<String, List<String>>();
-
-      Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
-      for (String partitionName : stateMap.keySet())
-      {
-        Map<String, String> instanceStateMap = stateMap.get(partitionName);
-        for (String instanceName : instanceStateMap.keySet())
-        {
-          if (instanceStateMap.get(instanceName).equals("ERROR"))
-          {
-            if (!resetPartitionNames.containsKey(instanceName))
-            {
-              resetPartitionNames.put(instanceName, new ArrayList<String>());
-            }
-            resetPartitionNames.get(instanceName).add(partitionName);
-          }
-        }
-      }
-
-      for (String instanceName : resetPartitionNames.keySet())
-      {
-        resetPartition(clusterName,
-                       instanceName,
-                       extView.getResourceName(),
-                       resetPartitionNames.get(instanceName));
-      }
-    }
-  }
-
-  @Override
-  public void addCluster(String clusterName, boolean overwritePrevRecord)
-  {
-    String root = "/" + clusterName;
-    String path;
-
-    // TODO For ease of testing only, should remove later
-    if (_zkClient.exists(root))
-    {
-      logger.warn("Root directory exists.Cleaning the root directory:" + root
-          + " overwritePrevRecord: " + overwritePrevRecord);
-      if (overwritePrevRecord)
-      {
-        _zkClient.deleteRecursive(root);
-      }
-      else
-      {
-        throw new HelixException("Cluster " + clusterName + " already exists");
-      }
-    }
-
-    _zkClient.createPersistent(root);
-
-    // IDEAL STATE
-    _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
-    // CONFIGURATIONS
-    // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
-    path =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.CLUSTER.toString(),
-                                   clusterName);
-    _zkClient.createPersistent(path, true);
-    _zkClient.writeData(path, new ZNRecord(clusterName));
-    path =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.PARTICIPANT.toString());
-    _zkClient.createPersistent(path);
-    path =
-        PropertyPathConfig.getPath(PropertyType.CONFIGS,
-                                   clusterName,
-                                   ConfigScopeProperty.RESOURCE.toString());
-    _zkClient.createPersistent(path);
-    // PROPERTY STORE
-    path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
-    _zkClient.createPersistent(path);
-    // LIVE INSTANCES
-    _zkClient.createPersistent(HelixUtil.getLiveInstancesPath(clusterName));
-    // MEMBER INSTANCES
-    _zkClient.createPersistent(HelixUtil.getMemberInstancesPath(clusterName));
-    // External view
-    _zkClient.createPersistent(HelixUtil.getExternalViewPath(clusterName));
-    // State model definition
-    _zkClient.createPersistent(HelixUtil.getStateModelDefinitionPath(clusterName));
-
-    // controller
-    _zkClient.createPersistent(HelixUtil.getControllerPath(clusterName));
-    path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
-    final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
-    final List<String> emptyList = new ArrayList<String>();
-    emptyHistory.setListField(clusterName, emptyList);
-    _zkClient.createPersistent(path, emptyHistory);
-
-    path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
-    _zkClient.createPersistent(path);
-
-    path = PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
-    _zkClient.createPersistent(path);
-
-    path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
-    _zkClient.createPersistent(path);
-  }
-
-  @Override
-  public List<String> getInstancesInCluster(String clusterName)
-  {
-    String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
-    return _zkClient.getChildren(memberInstancesPath);
-  }
-
-  @Override
-  public void addResource(String clusterName,
-                          String resourceName,
-                          int partitions,
-                          String stateModelRef)
-  {
-    addResource(clusterName,
-                resourceName,
-                partitions,
-                stateModelRef,
-                IdealStateModeProperty.AUTO.toString(),
-                0);
-  }
-
-  @Override
-  public void addResource(String clusterName,
-                          String resourceName,
-                          int partitions,
-                          String stateModelRef,
-                          String idealStateMode)
-  {
-    addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, 0);
-  }
-
-  @Override
-  public void addResource(String clusterName,
-                          String resourceName,
-                          int partitions,
-                          String stateModelRef,
-                          String idealStateMode,
-                          int bucketSize)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-
-    IdealStateModeProperty mode = IdealStateModeProperty.AUTO;
-    try
-    {
-      mode = IdealStateModeProperty.valueOf(idealStateMode);
-    }
-    catch (Exception e)
-    {
-      logger.error("", e);
-    }
-    IdealState idealState = new IdealState(resourceName);
-    idealState.setNumPartitions(partitions);
-    idealState.setStateModelDefRef(stateModelRef);
-    idealState.setIdealStateMode(mode.toString());
-    idealState.setReplicas("" + 0);
-    idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-
-    if (bucketSize > 0)
-    {
-      idealState.setBucketSize(bucketSize);
-    }
-
-    String stateModelDefPath =
-        PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
-                                   clusterName,
-                                   stateModelRef);
-    if (!_zkClient.exists(stateModelDefPath))
-    {
-      throw new HelixException("State model " + stateModelRef
-          + " not found in the cluster STATEMODELDEFS path");
-    }
-
-    String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
-    String dbIdealStatePath = idealStatePath + "/" + resourceName;
-    if (_zkClient.exists(dbIdealStatePath))
-    {
-      throw new HelixException("Skip the operation. DB ideal state directory exists:"
-          + dbIdealStatePath);
-    }
-
-    ZKUtil.createChildren(_zkClient, idealStatePath, idealState.getRecord());
-  }
-
-  @Override
-  public List<String> getClusters()
-  {
-    List<String> zkToplevelPathes = _zkClient.getChildren("/");
-    List<String> result = new ArrayList<String>();
-    for (String pathName : zkToplevelPathes)
-    {
-      if (ZKUtil.isClusterSetup(pathName, _zkClient))
-      {
-        result.add(pathName);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public List<String> getResourcesInCluster(String clusterName)
-  {
-    return _zkClient.getChildren(HelixUtil.getIdealStatePath(clusterName));
-  }
-
-  @Override
-  public IdealState getResourceIdealState(String clusterName, String dbName)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    return accessor.getProperty(keyBuilder.idealStates(dbName));
-  }
-
-  @Override
-  public void setResourceIdealState(String clusterName,
-                                    String dbName,
-                                    IdealState idealState)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    accessor.setProperty(keyBuilder.idealStates(dbName), idealState);
-  }
-
-  @Override
-  public ExternalView getResourceExternalView(String clusterName, String resourceName)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    return accessor.getProperty(keyBuilder.externalView(resourceName));
-  }
-
-  @Override
-  public void addStateModelDef(String clusterName,
-                               String stateModelDef,
-                               StateModelDefinition stateModel)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-    String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
-    String stateModelPath = stateModelDefPath + "/" + stateModelDef;
-    if (_zkClient.exists(stateModelPath))
-    {
-      logger.warn("Skip the operation.State Model directory exists:" + stateModelPath);
-      throw new HelixException("State model path " + stateModelPath + " already exists.");
-    }
-
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
-  }
-
-  @Override
-  public void dropResource(String clusterName, String resourceName)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    accessor.removeProperty(keyBuilder.idealStates(resourceName));
-  }
-
-  @Override
-  public List<String> getStateModelDefs(String clusterName)
-  {
-    return _zkClient.getChildren(HelixUtil.getStateModelDefinitionPath(clusterName));
-  }
-
-  @Override
-  public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
-  {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
-  }
-
-  @Override
-  public void addStat(String clusterName, final String statName)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-
-    String persistentStatsPath =
-        PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
-    baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
-    {
-
-      @Override
-      public ZNRecord update(ZNRecord statsRec)
-      {
-        if (statsRec == null)
-        {
-          // TODO: fix naming of this record, if it matters
-          statsRec = new ZNRecord(PersistentStats.nodeName);
-        }
-
-        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
-        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-        for (String newStat : newStatMap.keySet())
-        {
-          if (!currStatMap.containsKey(newStat))
-          {
-            currStatMap.put(newStat, newStatMap.get(newStat));
-          }
-        }
-        statsRec.setMapFields(currStatMap);
-
-        return statsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  @Override
-  public void addAlert(final String clusterName, final String alertName)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
-    String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
-
-    baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
-    {
-
-      @Override
-      public ZNRecord update(ZNRecord alertsRec)
-      {
-        if (alertsRec == null)
-        {
-          // TODO: fix naming of this record, if it matters
-          alertsRec = new ZNRecord(Alerts.nodeName);
-
-        }
-
-        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
-        StringBuilder newStatName = new StringBuilder();
-        Map<String, String> newAlertMap = new HashMap<String, String>();
-
-        // use AlertsHolder to get map of new stats and map for this alert
-        AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
-        // add stat
-        addStat(clusterName, newStatName.toString());
-
-        // add alert
-        currAlertMap.put(alertName, newAlertMap);
-
-        alertsRec.setMapFields(currAlertMap);
-
-        return alertsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  @Override
-  public void dropCluster(String clusterName)
-  {
-    logger.info("Deleting cluster " + clusterName);
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    String root = "/" + clusterName;
-    if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0)
-    {
-      throw new HelixException("There are still live instances in the cluster, shut them down first.");
-    }
-
-    if (accessor.getProperty(keyBuilder.controllerLeader()) != null)
-    {
-      throw new HelixException("There are still LEADER in the cluster, shut them down first.");
-    }
-
-    _zkClient.deleteRecursive(root);
-  }
-
-  @Override
-  public void dropStat(String clusterName, final String statName)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-
-    String persistentStatsPath =
-        PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
-    baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
-    {
-
-      @Override
-      public ZNRecord update(ZNRecord statsRec)
-      {
-        if (statsRec == null)
-        {
-          throw new HelixException("No stats record in ZK, nothing to drop");
-        }
-
-        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
-        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-
-        // delete each stat from stat map
-        for (String newStat : newStatMap.keySet())
-        {
-          if (currStatMap.containsKey(newStat))
-          {
-            currStatMap.remove(newStat);
-          }
-        }
-        statsRec.setMapFields(currStatMap);
-
-        return statsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  @Override
-  public void dropAlert(String clusterName, final String alertName)
-  {
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("cluster " + clusterName + " is not setup yet");
-    }
-
-    String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
-
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
-    if (!baseAccessor.exists(alertsPath, 0))
-    {
-      throw new HelixException("No alerts node in ZK, nothing to drop");
-    }
-
-    baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
-    {
-      @Override
-      public ZNRecord update(ZNRecord alertsRec)
-      {
-        if (alertsRec == null)
-        {
-          throw new HelixException("No alerts record in ZK, nothing to drop");
-        }
-
-        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
-        currAlertMap.remove(alertName);
-        alertsRec.setMapFields(currAlertMap);
-
-        return alertsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  @Override
-  public void addClusterToGrandCluster(String clusterName, String grandCluster)
-  {
-    if (!ZKUtil.isClusterSetup(grandCluster, _zkClient))
-    {
-      throw new HelixException("Grand cluster " + grandCluster + " is not setup yet");
-    }
-
-    if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
-    {
-      throw new HelixException("Cluster " + clusterName + " is not setup yet");
-    }
-
-    IdealState idealState = new IdealState(clusterName);
-
-    idealState.setNumPartitions(1);
-    idealState.setStateModelDefRef("LeaderStandby");
-
-    List<String> controllers = getInstancesInCluster(grandCluster);
-    if (controllers.size() == 0)
-    {
-      throw new HelixException("Grand cluster " + grandCluster + " has no instances");
-    }
-    idealState.setReplicas(Integer.toString(controllers.size()));
-    Collections.shuffle(controllers);
-    idealState.getRecord().setListField(clusterName, controllers);
-    idealState.setPartitionState(clusterName, controllers.get(0), "LEADER");
-    for (int i = 1; i < controllers.size(); i++)
-    {
-      idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY");
-    }
-
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
-  }
-
-  @Override
-  public void setConfig(ConfigScope scope, Map<String, String> properties)
-  {
-    for (String key : properties.keySet())
-    {
-      _configAccessor.set(scope, key, properties.get(key));
-    }
-  }
-
-  @Override
-  public Map<String, String> getConfig(ConfigScope scope, Set<String> keys)
-  {
-    Map<String, String> properties = new TreeMap<String, String>();
-
-    if (keys == null)
-    {
-      // read all simple fields
-
-    }
-    else
-    {
-      for (String key : keys)
-      {
-        String value = _configAccessor.get(scope, key);
-        if (value == null)
-        {
-          logger.error("Config doesn't exist for key: " + key);
-          continue;
-        }
-        properties.put(key, value);
-      }
-    }
-
-    return properties;
-  }
-
-  @Override
-  public List<String> getConfigKeys(ConfigScopeProperty scope,
-                                    String clusterName,
-                                    String... keys)
-  {
-    return _configAccessor.getKeys(scope, clusterName, keys);
-  }
-
-  @Override
-  public void removeConfig(ConfigScope scope, Set<String> keys)
-  {
-    for (String key : keys)
-    {
-      _configAccessor.remove(scope, key);
-    }
-  }
-
-  @Override
-  public void rebalance(String clusterName, String resourceName, int replica)
-  {
-    rebalance(clusterName, resourceName, replica, resourceName);
-  }
-
-  void rebalance(String clusterName, String resourceName, int replica, String keyPrefix)
-  {
-    List<String> InstanceNames = getInstancesInCluster(clusterName);
-
-    // ensure we get the same idealState with the same set of instances
-    Collections.sort(InstanceNames);
-
-    IdealState idealState = getResourceIdealState(clusterName, resourceName);
-    if (idealState == null)
-    {
-      throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
-    }
-
-    idealState.setReplicas(Integer.toString(replica));
-    int partitions = idealState.getNumPartitions();
-    String stateModelName = idealState.getStateModelDefRef();
-    StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
-
-    if (stateModDef == null)
-    {
-      throw new HelixException("cannot find state model: " + stateModelName);
-    }
-    // StateModelDefinition def = new StateModelDefinition(stateModDef);
-
-    List<String> statePriorityList = stateModDef.getStatesPriorityList();
-
-    String masterStateValue = null;
-    String slaveStateValue = null;
-    replica--;
-
-    for (String state : statePriorityList)
-    {
-      String count = stateModDef.getNumInstancesPerState(state);
-      if (count.equals("1"))
-      {
-        if (masterStateValue != null)
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        masterStateValue = state;
-      }
-      else if (count.equalsIgnoreCase("R"))
-      {
-        if (slaveStateValue != null)
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        slaveStateValue = state;
-      }
-      else if (count.equalsIgnoreCase("N"))
-      {
-        if (!(masterStateValue == null && slaveStateValue == null))
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        replica = InstanceNames.size() - 1;
-        masterStateValue = slaveStateValue = state;
-      }
-    }
-    if (masterStateValue == null && slaveStateValue == null)
-    {
-      throw new HelixException("Invalid or unsupported state model definition");
-    }
-
-    if (masterStateValue == null)
-    {
-      masterStateValue = slaveStateValue;
-    }
-    if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
-    {
-      ZNRecord newIdealState =
-          IdealStateCalculatorForStorageNode.calculateIdealState(InstanceNames,
-                                                                 partitions,
-                                                                 replica,
-                                                                 keyPrefix,
-                                                                 masterStateValue,
-                                                                 slaveStateValue);
-
-      // for now keep mapField in AUTO mode and remove listField in CUSTOMIZED mode
-      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
-      {
-        idealState.getRecord().setListFields(newIdealState.getListFields());
-        idealState.getRecord().setMapFields(newIdealState.getMapFields());
-      }
-      if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
-      {
-        idealState.getRecord().setMapFields(newIdealState.getMapFields());
-      }
-    }
-    else
-    {
-      for (int i = 0; i < partitions; i++)
-      {
-        String partitionName = keyPrefix + "_" + i;
-        idealState.getRecord().setMapField(partitionName, new HashMap<String, String>());
-        idealState.getRecord().setListField(partitionName, new ArrayList<String>());
-      }
-    }
-    setResourceIdealState(clusterName, resourceName, idealState);
-  }
-
-  @Override
-  public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
-  {
-    ZNRecord idealStateRecord =
-        (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile)));
-    if (idealStateRecord.getId() == null
-        || !idealStateRecord.getId().equals(resourceName))
-    {
-      throw new IllegalArgumentException("ideal state must have same id as resource name");
-    }
-    setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
-  }
-
-  private static byte[] readFile(String filePath) throws IOException
-  {
-    File file = new File(filePath);
-
-    int size = (int) file.length();
-    byte[] bytes = new byte[size];
-    DataInputStream dis = new DataInputStream(new FileInputStream(file));
-    int read = 0;
-    int numRead = 0;
-    while (read < bytes.length
-        && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0)
-    {
-      read = read + numRead;
-    }
-    return bytes;
-  }
-
-  public void addStateModelDef(String clusterName,
-                               String stateModelDefName,
-                               String stateModelDefFile) throws IOException
-  {
-    ZNRecord record =
-        (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
-    if (record == null || record.getId() == null
-        || !record.getId().equals(stateModelDefName))
-    {
-      throw new IllegalArgumentException("state model definition must have same id as state model def name");
-    }
-    addStateModelDef(clusterName, stateModelDefName, new StateModelDefinition(record));
-
-  }
-
-  public void addMessageConstraint(String clusterName,
-                                   final String constraintId,
-                                   final Map<String, String> constraints)
-  {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
-    Builder keyBuilder = new Builder(clusterName);
-    String path = keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()).getPath();
-
-    baseAccessor.update(path, new DataUpdater<ZNRecord>()
-    {
-      @Override
-      public ZNRecord update(ZNRecord currentData)
-      {
-        if (currentData == null)
-        {
-          currentData = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
-        }
-
-        Map<String, String> map = currentData.getMapField(constraintId);
-        if (map == null)
-        {
-          map = new TreeMap<String, String>();
-          currentData.setMapField(constraintId, map);
-        } else
-        {
-          logger.warn("Overwrite existing constraint " + constraintId + ": " + map);
-        }
-
-        for (String key : constraints.keySet())
-        {
-          // make sure contraint attribute is valid
-          ConstraintAttribute attr = ConstraintAttribute.valueOf(key.toUpperCase());
-
-          map.put(attr.toString(), constraints.get(key));
-        }
-        
-        return currentData;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java
deleted file mode 100644
index 9df0638..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java
+++ /dev/null
@@ -1,590 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.GroupCommit;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordAssembler;
-import com.linkedin.helix.ZNRecordBucketizer;
-import com.linkedin.helix.ZNRecordUpdater;
-import com.linkedin.helix.controller.restlet.ZNRecordUpdate;
-import com.linkedin.helix.controller.restlet.ZNRecordUpdate.OpCode;
-import com.linkedin.helix.controller.restlet.ZkPropertyTransferClient;
-import com.linkedin.helix.model.LiveInstance;
-
-public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener
-{
-  private static Logger                    LOG                       =
-                                                                         Logger.getLogger(ZKHelixDataAccessor.class);
-  private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
-  final InstanceType                       _instanceType;
-  private final String                     _clusterName;
-  private final Builder                    _propertyKeyBuilder;
-  ZkPropertyTransferClient                 _zkPropertyTransferClient = null;
-  private final GroupCommit                _groupCommit              = new GroupCommit();
-  String                                   _zkPropertyTransferSvcUrl = null;
-
-  public ZKHelixDataAccessor(String clusterName,
-                             BaseDataAccessor<ZNRecord> baseDataAccessor)
-  {
-    this(clusterName, null, baseDataAccessor);
-  }
-
-  public ZKHelixDataAccessor(String clusterName,
-                             InstanceType instanceType,
-                             BaseDataAccessor<ZNRecord> baseDataAccessor)
-  {
-    _clusterName = clusterName;
-    _instanceType = instanceType;
-    _baseDataAccessor = baseDataAccessor;
-    _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
-  }
-
-  @Override
-  public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value)
-  {
-    PropertyType type = key.getType();
-    String path = key.getPath();
-    int options = constructOptions(type);
-    return _baseDataAccessor.create(path, value.getRecord(), options);
-  }
-
-  @Override
-  public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
-  {
-    PropertyType type = key.getType();
-    if (!value.isValid())
-    {
-      throw new HelixException("The ZNRecord for " + type + " is not valid.");
-    }
-
-    String path = key.getPath();
-    int options = constructOptions(type);
-
-    if (type.usePropertyTransferServer())
-    {
-      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
-      {
-        ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
-        _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-        return true;
-      }
-    }
-
-    boolean success = false;
-    switch (type)
-    {
-    case IDEALSTATES:
-    case EXTERNALVIEW:
-      // check if bucketized
-      if (value.getBucketSize() > 0)
-      {
-        // set parent node
-        ZNRecord metaRecord = new ZNRecord(value.getId());
-        metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
-        success = _baseDataAccessor.set(path, metaRecord, options);
-        if (success)
-        {
-          ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
-
-          Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
-          List<String> paths = new ArrayList<String>();
-          List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
-          for (String bucketName : map.keySet())
-          {
-            paths.add(path + "/" + bucketName);
-            bucketizedRecords.add(map.get(bucketName));
-          }
-
-          // TODO: set success accordingly
-          _baseDataAccessor.setChildren(paths, bucketizedRecords, options);
-        }
-      }
-      else
-      {
-        success = _baseDataAccessor.set(path, value.getRecord(), options);
-      }
-      break;
-    default:
-      success = _baseDataAccessor.set(path, value.getRecord(), options);
-      break;
-    }
-    return success;
-  }
-
-  @Override
-  public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value)
-  {
-    PropertyType type = key.getType();
-    String path = key.getPath();
-    int options = constructOptions(type);
-
-    boolean success = false;
-    switch (type)
-    {
-    case CURRENTSTATES:
-      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
-      break;
-    default:
-      if (type.usePropertyTransferServer())
-      {
-        if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
-        {
-          ZNRecordUpdate update =
-              new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
-          _zkPropertyTransferClient.enqueueZNRecordUpdate(update,
-                                                          _zkPropertyTransferSvcUrl);
-
-          return true;
-        }
-        else
-        {
-          LOG.debug("getPropertyTransferUrl is null, skip updating the value");
-          // TODO: consider skip the write operation
-          return true;
-        }
-      }
-      success =
-          _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
-      break;
-    }
-    return success;
-  }
-
-  @Override
-  public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
-  {
-    if (keys == null || keys.size() == 0)
-    {
-      return Collections.emptyList();
-    }
-
-    List<T> childValues = new ArrayList<T>();
-
-    // read all records
-    List<String> paths = new ArrayList<String>();
-    for (PropertyKey key : keys)
-    {
-      paths.add(key.getPath());
-    }
-    List<ZNRecord> children = _baseDataAccessor.get(paths, null, 0);
-
-    // check if bucketized
-    for (int i = 0; i < keys.size(); i++)
-    {
-      PropertyKey key = keys.get(i);
-      ZNRecord record = children.get(i);
-
-      PropertyType type = key.getType();
-      String path = key.getPath();
-      int options = constructOptions(type);
-      // ZNRecord record = null;
-
-      switch (type)
-      {
-      case CURRENTSTATES:
-      case IDEALSTATES:
-      case EXTERNALVIEW:
-        // check if bucketized
-        if (record != null)
-        {
-          HelixProperty property = new HelixProperty(record);
-
-          int bucketSize = property.getBucketSize();
-          if (bucketSize > 0)
-          {
-            List<ZNRecord> childRecords =
-                _baseDataAccessor.getChildren(path, null, options);
-            ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
-
-            // merge with parent node value
-            if (assembledRecord != null)
-            {
-              record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
-              record.getListFields().putAll(assembledRecord.getListFields());
-              record.getMapFields().putAll(assembledRecord.getMapFields());
-            }
-          }
-        }
-        break;
-      default:
-        break;
-      }
-
-      @SuppressWarnings("unchecked")
-      T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
-      childValues.add(t);
-    }
-
-    return childValues;
-  }
-
-  @Override
-  public <T extends HelixProperty> T getProperty(PropertyKey key)
-  {
-    PropertyType type = key.getType();
-    String path = key.getPath();
-    int options = constructOptions(type);
-    ZNRecord record = null;
-    try
-    {
-      Stat stat = new Stat();
-      record = _baseDataAccessor.get(path, stat, options);
-      if (record != null)
-      {
-        record.setCreationTime(stat.getCtime());
-        record.setModifiedTime(stat.getMtime());
-      }
-    }
-    catch (ZkNoNodeException e)
-    {
-      // OK
-    }
-
-    switch (type)
-    {
-    case CURRENTSTATES:
-    case IDEALSTATES:
-    case EXTERNALVIEW:
-      // check if bucketized
-      if (record != null)
-      {
-        HelixProperty property = new HelixProperty(record);
-
-        int bucketSize = property.getBucketSize();
-        if (bucketSize > 0)
-        {
-          List<ZNRecord> childRecords =
-              _baseDataAccessor.getChildren(path, null, options);
-          ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
-
-          // merge with parent node value
-          if (assembledRecord != null)
-          {
-            record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
-            record.getListFields().putAll(assembledRecord.getListFields());
-            record.getMapFields().putAll(assembledRecord.getMapFields());
-          }
-        }
-      }
-      break;
-    default:
-      break;
-    }
-
-    @SuppressWarnings("unchecked")
-    T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
-    return t;
-  }
-
-  @Override
-  public boolean removeProperty(PropertyKey key)
-  {
-    PropertyType type = key.getType();
-    String path = key.getPath();
-    int options = constructOptions(type);
-
-    return _baseDataAccessor.remove(path, options);
-  }
-
-  @Override
-  public List<String> getChildNames(PropertyKey key)
-  {
-    PropertyType type = key.getType();
-    String parentPath = key.getPath();
-    int options = constructOptions(type);
-    List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
-    if (childNames == null)
-    {
-      childNames = Collections.emptyList();
-    }
-    return childNames;
-  }
-
-  @Override
-  public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
-  {
-    PropertyType type = key.getType();
-    String parentPath = key.getPath();
-    int options = constructOptions(type);
-    List<T> childValues = new ArrayList<T>();
-
-    List<ZNRecord> children = _baseDataAccessor.getChildren(parentPath, null, options);
-    if (children != null)
-    {
-      for (ZNRecord record : children)
-      {
-        switch (type)
-        {
-        case CURRENTSTATES:
-        case IDEALSTATES:
-        case EXTERNALVIEW:
-          if (record != null)
-          {
-            HelixProperty property = new HelixProperty(record);
-
-            int bucketSize = property.getBucketSize();
-            if (bucketSize > 0)
-            {
-              // TODO: fix this if record.id != pathName
-              String childPath = parentPath + "/" + record.getId();
-              List<ZNRecord> childRecords =
-                  _baseDataAccessor.getChildren(childPath, null, options);
-              ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
-
-              // merge with parent node value
-              if (assembledRecord != null)
-              {
-                record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
-                record.getListFields().putAll(assembledRecord.getListFields());
-                record.getMapFields().putAll(assembledRecord.getMapFields());
-              }
-            }
-          }
-
-          break;
-        default:
-          break;
-        }
-
-        if (record != null)
-        {
-          @SuppressWarnings("unchecked")
-          T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
-          childValues.add(t);
-        }
-      }
-    }
-    return childValues;
-  }
-
-  @Override
-  public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key)
-  {
-    PropertyType type = key.getType();
-    String parentPath = key.getPath();
-    int options = constructOptions(type);
-    List<T> children = getChildValues(key);
-    Map<String, T> childValuesMap = new HashMap<String, T>();
-    for (T t : children)
-    {
-      childValuesMap.put(t.getRecord().getId(), t);
-    }
-    return childValuesMap;
-
-  }
-
-  @Override
-  public Builder keyBuilder()
-  {
-    return _propertyKeyBuilder;
-  }
-
-  private int constructOptions(PropertyType type)
-  {
-    int options = 0;
-    if (type.isPersistent())
-    {
-      options = options | AccessOption.PERSISTENT;
-    }
-    else
-    {
-      options = options | AccessOption.EPHEMERAL;
-    }
-
-    // if (type == PropertyType.CURRENTSTATES && _instanceType ==
-    // InstanceType.PARTICIPANT)
-    // {
-    // options = options | BaseDataAccessor.Option.WRITE_THROUGH;
-    // }
-    // else if (type == PropertyType.EXTERNALVIEW
-    // && _instanceType == InstanceType.CONTROLLER)
-    // {
-    // options = options | BaseDataAccessor.Option.WRITE_THROUGH;
-    // }
-
-    return options;
-  }
-
-  @Override
-  public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
-                                                            List<T> children)
-  {
-    // TODO: add validation
-    int options = -1;
-    List<String> paths = new ArrayList<String>();
-    List<ZNRecord> records = new ArrayList<ZNRecord>();
-    for (int i = 0; i < keys.size(); i++)
-    {
-      PropertyKey key = keys.get(i);
-      PropertyType type = key.getType();
-      String path = key.getPath();
-      paths.add(path);
-      HelixProperty value = children.get(i);
-      records.add(value.getRecord());
-      options = constructOptions(type);
-    }
-    return _baseDataAccessor.createChildren(paths, records, options);
-  }
-
-  @Override
-  public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys,
-                                                         List<T> children)
-  {
-    int options = -1;
-    List<String> paths = new ArrayList<String>();
-    List<ZNRecord> records = new ArrayList<ZNRecord>();
-
-    List<List<String>> bucketizedPaths =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
-    List<List<ZNRecord>> bucketizedRecords =
-        new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(),
-                                                                           null));
-
-    for (int i = 0; i < keys.size(); i++)
-    {
-      PropertyKey key = keys.get(i);
-      PropertyType type = key.getType();
-      String path = key.getPath();
-      paths.add(path);
-      options = constructOptions(type);
-
-      HelixProperty value = children.get(i);
-
-      switch (type)
-      {
-      case EXTERNALVIEW:
-        if (value.getBucketSize() == 0)
-        {
-          records.add(value.getRecord());
-        }
-        else
-        {
-          ZNRecord metaRecord = new ZNRecord(value.getId());
-          metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
-          records.add(metaRecord);
-
-          ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
-
-          Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
-          List<String> childBucketizedPaths = new ArrayList<String>();
-          List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
-          for (String bucketName : map.keySet())
-          {
-            childBucketizedPaths.add(path + "/" + bucketName);
-            childBucketizedRecords.add(map.get(bucketName));
-          }
-          bucketizedPaths.set(i, childBucketizedPaths);
-          bucketizedRecords.set(i, childBucketizedRecords);
-        }
-        break;
-      default:
-        records.add(value.getRecord());
-        break;
-      }
-    }
-
-    // set non-bucketized nodes or parent nodes of bucketized nodes
-    boolean success[] = _baseDataAccessor.setChildren(paths, records, options);
-
-    // set bucketized nodes
-    List<String> allBucketizedPaths = new ArrayList<String>();
-    List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();
-
-    for (int i = 0; i < keys.size(); i++)
-    {
-      if (success[i] && bucketizedPaths.get(i) != null)
-      {
-        allBucketizedPaths.addAll(bucketizedPaths.get(i));
-        allBucketizedRecords.addAll(bucketizedRecords.get(i));
-      }
-    }
-
-    // TODO: set success accordingly
-    _baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);
-
-    return success;
-  }
-
-  @Override
-  public BaseDataAccessor<ZNRecord> getBaseDataAccessor()
-  {
-    return _baseDataAccessor;
-  }
-
-  @Override
-  public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
-                                                            List<DataUpdater<ZNRecord>> updaters,
-                                                            int options)
-  {
-    return _baseDataAccessor.updateChildren(paths, updaters, options);
-  }
-
-  public void shutdown()
-  {
-    if (_zkPropertyTransferClient != null)
-    {
-      _zkPropertyTransferClient.shutdown();
-    }
-  }
-
-  @Override
-  public void onControllerChange(NotificationContext changeContext)
-  {
-    LOG.info("Controller has changed");
-    refreshZkPropertyTransferUrl();
-    if (_zkPropertyTransferClient == null)
-    {
-      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0)
-      {
-        LOG.info("Creating ZkPropertyTransferClient as we get url "
-            + _zkPropertyTransferSvcUrl);
-        _zkPropertyTransferClient =
-            new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
-      }
-    }
-  }
-
-  void refreshZkPropertyTransferUrl()
-  {
-    try
-    {
-      LiveInstance leader = getProperty(keyBuilder().controllerLeader());
-      if (leader != null)
-      {
-        _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
-        LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl
-            + " Controller " + leader.getInstanceName());
-      }
-      else
-      {
-        _zkPropertyTransferSvcUrl = null;
-      }
-    }
-    catch (Exception e)
-    {
-      // LOG.error("", e);
-      _zkPropertyTransferSvcUrl = null;
-    }
-  }
-}