You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[16/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java
deleted file mode 100644
index d3192b5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixAdmin.java
+++ /dev/null
@@ -1,1272 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixConstants;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.alerts.AlertsHolder;
-import com.linkedin.helix.alerts.StatsHolder;
-import com.linkedin.helix.model.Alerts;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintAttribute;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.InstanceConfig.InstanceConfigProperty;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.PauseSignal;
-import com.linkedin.helix.model.PersistentStats;
-import com.linkedin.helix.model.StateModelDefinition;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-import com.linkedin.helix.util.HelixUtil;
-
-public class ZKHelixAdmin implements HelixAdmin
-{
-
- private final ZkClient _zkClient;
- private final ConfigAccessor _configAccessor;
-
- private static Logger logger = Logger.getLogger(ZKHelixAdmin.class);
-
- public ZKHelixAdmin(ZkClient zkClient)
- {
- _zkClient = zkClient;
- _configAccessor = new ConfigAccessor(zkClient);
- }
-
- @Override
- public void addInstance(String clusterName, InstanceConfig instanceConfig)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
- String instanceConfigsPath =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- String nodeId = instanceConfig.getId();
- String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
-
- if (_zkClient.exists(instanceConfigPath))
- {
- throw new HelixException("Node " + nodeId + " already exists in cluster "
- + clusterName);
- }
-
- ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
-
- _zkClient.createPersistent(HelixUtil.getMessagePath(clusterName, nodeId), true);
- _zkClient.createPersistent(HelixUtil.getCurrentStateBasePath(clusterName, nodeId),
- true);
- _zkClient.createPersistent(HelixUtil.getErrorsPath(clusterName, nodeId), true);
- _zkClient.createPersistent(HelixUtil.getStatusUpdatesPath(clusterName, nodeId), true);
- }
-
- @Override
- public void dropInstance(String clusterName, InstanceConfig instanceConfig)
- {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
- String instanceConfigsPath =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- String nodeId = instanceConfig.getId();
- String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
- String instancePath = HelixUtil.getInstancePath(clusterName, nodeId);
-
- if (!_zkClient.exists(instanceConfigPath))
- {
- throw new HelixException("Node " + nodeId
- + " does not exist in config for cluster " + clusterName);
- }
-
- if (!_zkClient.exists(instancePath))
- {
- throw new HelixException("Node " + nodeId
- + " does not exist in instances for cluster " + clusterName);
- }
-
- // delete config path
- ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
-
- // delete instance path
- _zkClient.deleteRecursive(instancePath);
- }
-
- @Override
- public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
- {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
- // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
- String instanceConfigPath =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(),
- instanceName);
- if (!_zkClient.exists(instanceConfigPath))
- {
- throw new HelixException("instance" + instanceName + " does not exist in cluster "
- + clusterName);
- }
-
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
- }
-
- @Override
- public void enableInstance(final String clusterName,
- final String instanceName,
- final boolean enabled)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(),
- instanceName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- if (!baseAccessor.exists(path, 0))
- {
- throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
- + ", instance config does not exist");
- }
-
- baseAccessor.update(path, new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if (currentData == null)
- {
- throw new HelixException("Cluster: " + clusterName + ", instance: "
- + instanceName + ", participant config is null");
- }
-
- InstanceConfig config = new InstanceConfig(currentData);
- config.setInstanceEnabled(enabled);
- return config.getRecord();
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void enablePartition(final boolean enabled,
- final String clusterName,
- final String instanceName,
- final String resourceName,
- final List<String> partitionNames)
- {
- String path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString(),
- instanceName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- // check instanceConfig exists
- if (!baseAccessor.exists(path, 0))
- {
- throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
- + ", instance config does not exist");
- }
-
- // check resource exists
- String idealStatePath =
- PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
-
- ZNRecord idealStateRecord = null;
- try
- {
- idealStateRecord = baseAccessor.get(idealStatePath, null, 0);
- }
- catch (ZkNoNodeException e)
- {
- // OK.
- }
-
- if (idealStateRecord == null)
- {
- throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
- + ", ideal state does not exist");
- }
-
- // check partitions exist. warn if not
- IdealState idealState = new IdealState(idealStateRecord);
- for (String partitionName : partitionNames)
- {
- if ((idealState.getIdealStateMode() == IdealStateModeProperty.AUTO && idealState.getPreferenceList(partitionName) == null)
- || (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED && idealState.getInstanceStateMap(partitionName) == null))
- {
- logger.warn("Cluster: " + clusterName + ", resource: " + resourceName
- + ", partition: " + partitionName
- + ", partition does not exist in ideal state");
- }
- }
-
- // update participantConfig
- // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
- baseAccessor.update(path, new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if (currentData == null)
- {
- throw new HelixException("Cluster: " + clusterName + ", instance: "
- + instanceName + ", participant config is null");
- }
-
- // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
- List<String> list =
- currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
- Set<String> disabledPartitions = new HashSet<String>();
- if (list != null)
- {
- disabledPartitions.addAll(list);
- }
-
- if (enabled)
- {
- disabledPartitions.removeAll(partitionNames);
- }
- else
- {
- disabledPartitions.addAll(partitionNames);
- }
-
- list = new ArrayList<String>(disabledPartitions);
- Collections.sort(list);
- currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
- list);
- return currentData;
- }
- },
- AccessOption.PERSISTENT);
- }
-
- @Override
- public void enableCluster(String clusterName, boolean enabled)
- {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- if (enabled)
- {
- accessor.removeProperty(keyBuilder.pause());
- }
- else
- {
- accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause"));
- }
- }
-
- @Override
- public void resetPartition(String clusterName,
- String instanceName,
- String resourceName,
- List<String> partitionNames)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- // check the instance is alive
- LiveInstance liveInstance =
- accessor.getProperty(keyBuilder.liveInstance(instanceName));
- if (liveInstance == null)
- {
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName + ", because " + instanceName
- + " is not alive");
- }
-
- // check resource group exists
- IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
- if (idealState == null)
- {
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName + ", because " + resourceName
- + " is not added");
- }
-
- // check partition exists in resource group
- Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
- if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
- {
- Set<String> partitions =
- new HashSet<String>(idealState.getRecord().getMapFields().keySet());
- if (!partitions.containsAll(resetPartitionNames))
- {
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName + ", because not all "
- + partitionNames + " exist");
- }
- }
- else
- {
- Set<String> partitions =
- new HashSet<String>(idealState.getRecord().getListFields().keySet());
- if (!partitions.containsAll(resetPartitionNames))
- {
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName + ", because not all "
- + partitionNames + " exist");
- }
- }
-
- // check partition is in ERROR state
- String sessionId = liveInstance.getSessionId();
- CurrentState curState =
- accessor.getProperty(keyBuilder.currentState(instanceName,
- sessionId,
- resourceName));
- for (String partitionName : resetPartitionNames)
- {
- if (!curState.getState(partitionName).equals("ERROR"))
- {
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName + ", because not all "
- + partitionNames + " are in ERROR state");
- }
- }
-
- // check stateModelDef exists and get initial state
- String stateModelDef = idealState.getStateModelDefRef();
- StateModelDefinition stateModel =
- accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
- if (stateModel == null)
- {
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName + ", because " + stateModelDef
- + " is NOT found");
- }
-
- // check there is no pending messages for the partitions exist
- List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
- for (Message message : messages)
- {
- if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
- || !sessionId.equals(message.getTgtSessionId())
- || !resourceName.equals(message.getResourceName())
- || !resetPartitionNames.contains(message.getPartitionName()))
- {
- continue;
- }
-
- throw new HelixException("Can't reset state for " + resourceName + "/"
- + partitionNames + " on " + instanceName
- + ", because a pending message exists: " + message);
- }
-
- String adminName = null;
- try
- {
- adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
- }
- catch (UnknownHostException e)
- {
- // can ignore it
- logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
- adminName = "UNKNOWN";
- }
-
- List<Message> resetMessages = new ArrayList<Message>();
- List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
- for (String partitionName : resetPartitionNames)
- {
- // send ERROR to initialState message
- String msgId = UUID.randomUUID().toString();
- Message message = new Message(MessageType.STATE_TRANSITION, msgId);
- message.setSrcName(adminName);
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setPartitionName(partitionName);
- message.setResourceName(resourceName);
- message.setTgtSessionId(sessionId);
- message.setStateModelDef(stateModelDef);
- message.setFromState("ERROR");
- message.setToState(stateModel.getInitialState());
- message.setStateModelFactoryName(idealState.getStateModelFactoryName());
-
- resetMessages.add(message);
- messageKeys.add(keyBuilder.message(instanceName, message.getId()));
- }
-
- accessor.setChildren(messageKeys, resetMessages);
- }
-
- @Override
- public void resetInstance(String clusterName, List<String> instanceNames)
- {
- // TODO: not mp-safe
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
- List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
-
- Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
- for (String instanceName : resetInstanceNames)
- {
- List<String> resetPartitionNames = new ArrayList<String>();
- for (ExternalView extView : extViews)
- {
- Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
- for (String partitionName : stateMap.keySet())
- {
- Map<String, String> instanceStateMap = stateMap.get(partitionName);
-
- if (instanceStateMap.containsKey(instanceName)
- && instanceStateMap.get(instanceName).equals("ERROR"))
- {
- resetPartitionNames.add(partitionName);
- }
- }
- resetPartition(clusterName,
- instanceName,
- extView.getResourceName(),
- resetPartitionNames);
- }
- }
- }
-
- @Override
- public void resetResource(String clusterName, List<String> resourceNames)
- {
- // TODO: not mp-safe
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
- List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
-
- Set<String> resetResourceNames = new HashSet<String>(resourceNames);
- for (ExternalView extView : extViews)
- {
- if (!resetResourceNames.contains(extView.getResourceName()))
- {
- continue;
- }
-
- // instanceName -> list of resetPartitionNames
- Map<String, List<String>> resetPartitionNames = new HashMap<String, List<String>>();
-
- Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
- for (String partitionName : stateMap.keySet())
- {
- Map<String, String> instanceStateMap = stateMap.get(partitionName);
- for (String instanceName : instanceStateMap.keySet())
- {
- if (instanceStateMap.get(instanceName).equals("ERROR"))
- {
- if (!resetPartitionNames.containsKey(instanceName))
- {
- resetPartitionNames.put(instanceName, new ArrayList<String>());
- }
- resetPartitionNames.get(instanceName).add(partitionName);
- }
- }
- }
-
- for (String instanceName : resetPartitionNames.keySet())
- {
- resetPartition(clusterName,
- instanceName,
- extView.getResourceName(),
- resetPartitionNames.get(instanceName));
- }
- }
- }
-
- @Override
- public void addCluster(String clusterName, boolean overwritePrevRecord)
- {
- String root = "/" + clusterName;
- String path;
-
- // TODO For ease of testing only, should remove later
- if (_zkClient.exists(root))
- {
- logger.warn("Root directory exists.Cleaning the root directory:" + root
- + " overwritePrevRecord: " + overwritePrevRecord);
- if (overwritePrevRecord)
- {
- _zkClient.deleteRecursive(root);
- }
- else
- {
- throw new HelixException("Cluster " + clusterName + " already exists");
- }
- }
-
- _zkClient.createPersistent(root);
-
- // IDEAL STATE
- _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
- // CONFIGURATIONS
- // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
- path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.CLUSTER.toString(),
- clusterName);
- _zkClient.createPersistent(path, true);
- _zkClient.writeData(path, new ZNRecord(clusterName));
- path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- _zkClient.createPersistent(path);
- path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.RESOURCE.toString());
- _zkClient.createPersistent(path);
- // PROPERTY STORE
- path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
- _zkClient.createPersistent(path);
- // LIVE INSTANCES
- _zkClient.createPersistent(HelixUtil.getLiveInstancesPath(clusterName));
- // MEMBER INSTANCES
- _zkClient.createPersistent(HelixUtil.getMemberInstancesPath(clusterName));
- // External view
- _zkClient.createPersistent(HelixUtil.getExternalViewPath(clusterName));
- // State model definition
- _zkClient.createPersistent(HelixUtil.getStateModelDefinitionPath(clusterName));
-
- // controller
- _zkClient.createPersistent(HelixUtil.getControllerPath(clusterName));
- path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
- final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
- final List<String> emptyList = new ArrayList<String>();
- emptyHistory.setListField(clusterName, emptyList);
- _zkClient.createPersistent(path, emptyHistory);
-
- path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
- _zkClient.createPersistent(path);
-
- path = PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
- _zkClient.createPersistent(path);
-
- path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
- _zkClient.createPersistent(path);
- }
-
- @Override
- public List<String> getInstancesInCluster(String clusterName)
- {
- String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
- return _zkClient.getChildren(memberInstancesPath);
- }
-
- @Override
- public void addResource(String clusterName,
- String resourceName,
- int partitions,
- String stateModelRef)
- {
- addResource(clusterName,
- resourceName,
- partitions,
- stateModelRef,
- IdealStateModeProperty.AUTO.toString(),
- 0);
- }
-
- @Override
- public void addResource(String clusterName,
- String resourceName,
- int partitions,
- String stateModelRef,
- String idealStateMode)
- {
- addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, 0);
- }
-
- @Override
- public void addResource(String clusterName,
- String resourceName,
- int partitions,
- String stateModelRef,
- String idealStateMode,
- int bucketSize)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- IdealStateModeProperty mode = IdealStateModeProperty.AUTO;
- try
- {
- mode = IdealStateModeProperty.valueOf(idealStateMode);
- }
- catch (Exception e)
- {
- logger.error("", e);
- }
- IdealState idealState = new IdealState(resourceName);
- idealState.setNumPartitions(partitions);
- idealState.setStateModelDefRef(stateModelRef);
- idealState.setIdealStateMode(mode.toString());
- idealState.setReplicas("" + 0);
- idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-
- if (bucketSize > 0)
- {
- idealState.setBucketSize(bucketSize);
- }
-
- String stateModelDefPath =
- PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
- clusterName,
- stateModelRef);
- if (!_zkClient.exists(stateModelDefPath))
- {
- throw new HelixException("State model " + stateModelRef
- + " not found in the cluster STATEMODELDEFS path");
- }
-
- String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
- String dbIdealStatePath = idealStatePath + "/" + resourceName;
- if (_zkClient.exists(dbIdealStatePath))
- {
- throw new HelixException("Skip the operation. DB ideal state directory exists:"
- + dbIdealStatePath);
- }
-
- ZKUtil.createChildren(_zkClient, idealStatePath, idealState.getRecord());
- }
-
- @Override
- public List<String> getClusters()
- {
- List<String> zkToplevelPathes = _zkClient.getChildren("/");
- List<String> result = new ArrayList<String>();
- for (String pathName : zkToplevelPathes)
- {
- if (ZKUtil.isClusterSetup(pathName, _zkClient))
- {
- result.add(pathName);
- }
- }
- return result;
- }
-
- @Override
- public List<String> getResourcesInCluster(String clusterName)
- {
- return _zkClient.getChildren(HelixUtil.getIdealStatePath(clusterName));
- }
-
- @Override
- public IdealState getResourceIdealState(String clusterName, String dbName)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- return accessor.getProperty(keyBuilder.idealStates(dbName));
- }
-
- @Override
- public void setResourceIdealState(String clusterName,
- String dbName,
- IdealState idealState)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- accessor.setProperty(keyBuilder.idealStates(dbName), idealState);
- }
-
- @Override
- public ExternalView getResourceExternalView(String clusterName, String resourceName)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
- return accessor.getProperty(keyBuilder.externalView(resourceName));
- }
-
- @Override
- public void addStateModelDef(String clusterName,
- String stateModelDef,
- StateModelDefinition stateModel)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
- String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
- String stateModelPath = stateModelDefPath + "/" + stateModelDef;
- if (_zkClient.exists(stateModelPath))
- {
- logger.warn("Skip the operation.State Model directory exists:" + stateModelPath);
- throw new HelixException("State model path " + stateModelPath + " already exists.");
- }
-
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
- }
-
- @Override
- public void dropResource(String clusterName, String resourceName)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- accessor.removeProperty(keyBuilder.idealStates(resourceName));
- }
-
- @Override
- public List<String> getStateModelDefs(String clusterName)
- {
- return _zkClient.getChildren(HelixUtil.getStateModelDefinitionPath(clusterName));
- }
-
- @Override
- public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
- }
-
- @Override
- public void addStat(String clusterName, final String statName)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- String persistentStatsPath =
- PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
- {
-
- @Override
- public ZNRecord update(ZNRecord statsRec)
- {
- if (statsRec == null)
- {
- // TODO: fix naming of this record, if it matters
- statsRec = new ZNRecord(PersistentStats.nodeName);
- }
-
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
- for (String newStat : newStatMap.keySet())
- {
- if (!currStatMap.containsKey(newStat))
- {
- currStatMap.put(newStat, newStatMap.get(newStat));
- }
- }
- statsRec.setMapFields(currStatMap);
-
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void addAlert(final String clusterName, final String alertName)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
-
- baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
- {
-
- @Override
- public ZNRecord update(ZNRecord alertsRec)
- {
- if (alertsRec == null)
- {
- // TODO: fix naming of this record, if it matters
- alertsRec = new ZNRecord(Alerts.nodeName);
-
- }
-
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- StringBuilder newStatName = new StringBuilder();
- Map<String, String> newAlertMap = new HashMap<String, String>();
-
- // use AlertsHolder to get map of new stats and map for this alert
- AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
- // add stat
- addStat(clusterName, newStatName.toString());
-
- // add alert
- currAlertMap.put(alertName, newAlertMap);
-
- alertsRec.setMapFields(currAlertMap);
-
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void dropCluster(String clusterName)
- {
- logger.info("Deleting cluster " + clusterName);
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- String root = "/" + clusterName;
- if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0)
- {
- throw new HelixException("There are still live instances in the cluster, shut them down first.");
- }
-
- if (accessor.getProperty(keyBuilder.controllerLeader()) != null)
- {
- throw new HelixException("There are still LEADER in the cluster, shut them down first.");
- }
-
- _zkClient.deleteRecursive(root);
- }
-
- @Override
- public void dropStat(String clusterName, final String statName)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- String persistentStatsPath =
- PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
- {
-
- @Override
- public ZNRecord update(ZNRecord statsRec)
- {
- if (statsRec == null)
- {
- throw new HelixException("No stats record in ZK, nothing to drop");
- }
-
- Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
- Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-
- // delete each stat from stat map
- for (String newStat : newStatMap.keySet())
- {
- if (currStatMap.containsKey(newStat))
- {
- currStatMap.remove(newStat);
- }
- }
- statsRec.setMapFields(currStatMap);
-
- return statsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void dropAlert(String clusterName, final String alertName)
- {
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("cluster " + clusterName + " is not setup yet");
- }
-
- String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
-
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- if (!baseAccessor.exists(alertsPath, 0))
- {
- throw new HelixException("No alerts node in ZK, nothing to drop");
- }
-
- baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord alertsRec)
- {
- if (alertsRec == null)
- {
- throw new HelixException("No alerts record in ZK, nothing to drop");
- }
-
- Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
- currAlertMap.remove(alertName);
- alertsRec.setMapFields(currAlertMap);
-
- return alertsRec;
- }
- }, AccessOption.PERSISTENT);
- }
-
- @Override
- public void addClusterToGrandCluster(String clusterName, String grandCluster)
- {
- if (!ZKUtil.isClusterSetup(grandCluster, _zkClient))
- {
- throw new HelixException("Grand cluster " + grandCluster + " is not setup yet");
- }
-
- if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
- {
- throw new HelixException("Cluster " + clusterName + " is not setup yet");
- }
-
- IdealState idealState = new IdealState(clusterName);
-
- idealState.setNumPartitions(1);
- idealState.setStateModelDefRef("LeaderStandby");
-
- List<String> controllers = getInstancesInCluster(grandCluster);
- if (controllers.size() == 0)
- {
- throw new HelixException("Grand cluster " + grandCluster + " has no instances");
- }
- idealState.setReplicas(Integer.toString(controllers.size()));
- Collections.shuffle(controllers);
- idealState.getRecord().setListField(clusterName, controllers);
- idealState.setPartitionState(clusterName, controllers.get(0), "LEADER");
- for (int i = 1; i < controllers.size(); i++)
- {
- idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY");
- }
-
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
- }
-
- @Override
- public void setConfig(ConfigScope scope, Map<String, String> properties)
- {
- for (String key : properties.keySet())
- {
- _configAccessor.set(scope, key, properties.get(key));
- }
- }
-
- @Override
- public Map<String, String> getConfig(ConfigScope scope, Set<String> keys)
- {
- Map<String, String> properties = new TreeMap<String, String>();
-
- if (keys == null)
- {
- // read all simple fields
-
- }
- else
- {
- for (String key : keys)
- {
- String value = _configAccessor.get(scope, key);
- if (value == null)
- {
- logger.error("Config doesn't exist for key: " + key);
- continue;
- }
- properties.put(key, value);
- }
- }
-
- return properties;
- }
-
- @Override
- public List<String> getConfigKeys(ConfigScopeProperty scope,
- String clusterName,
- String... keys)
- {
- return _configAccessor.getKeys(scope, clusterName, keys);
- }
-
- @Override
- public void removeConfig(ConfigScope scope, Set<String> keys)
- {
- for (String key : keys)
- {
- _configAccessor.remove(scope, key);
- }
- }
-
- @Override
- public void rebalance(String clusterName, String resourceName, int replica)
- {
- rebalance(clusterName, resourceName, replica, resourceName);
- }
-
- void rebalance(String clusterName, String resourceName, int replica, String keyPrefix)
- {
- List<String> InstanceNames = getInstancesInCluster(clusterName);
-
- // ensure we get the same idealState with the same set of instances
- Collections.sort(InstanceNames);
-
- IdealState idealState = getResourceIdealState(clusterName, resourceName);
- if (idealState == null)
- {
- throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
- }
-
- idealState.setReplicas(Integer.toString(replica));
- int partitions = idealState.getNumPartitions();
- String stateModelName = idealState.getStateModelDefRef();
- StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
-
- if (stateModDef == null)
- {
- throw new HelixException("cannot find state model: " + stateModelName);
- }
- // StateModelDefinition def = new StateModelDefinition(stateModDef);
-
- List<String> statePriorityList = stateModDef.getStatesPriorityList();
-
- String masterStateValue = null;
- String slaveStateValue = null;
- replica--;
-
- for (String state : statePriorityList)
- {
- String count = stateModDef.getNumInstancesPerState(state);
- if (count.equals("1"))
- {
- if (masterStateValue != null)
- {
- throw new HelixException("Invalid or unsupported state model definition");
- }
- masterStateValue = state;
- }
- else if (count.equalsIgnoreCase("R"))
- {
- if (slaveStateValue != null)
- {
- throw new HelixException("Invalid or unsupported state model definition");
- }
- slaveStateValue = state;
- }
- else if (count.equalsIgnoreCase("N"))
- {
- if (!(masterStateValue == null && slaveStateValue == null))
- {
- throw new HelixException("Invalid or unsupported state model definition");
- }
- replica = InstanceNames.size() - 1;
- masterStateValue = slaveStateValue = state;
- }
- }
- if (masterStateValue == null && slaveStateValue == null)
- {
- throw new HelixException("Invalid or unsupported state model definition");
- }
-
- if (masterStateValue == null)
- {
- masterStateValue = slaveStateValue;
- }
- if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
- {
- ZNRecord newIdealState =
- IdealStateCalculatorForStorageNode.calculateIdealState(InstanceNames,
- partitions,
- replica,
- keyPrefix,
- masterStateValue,
- slaveStateValue);
-
- // for now keep mapField in AUTO mode and remove listField in CUSTOMIZED mode
- if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
- {
- idealState.getRecord().setListFields(newIdealState.getListFields());
- idealState.getRecord().setMapFields(newIdealState.getMapFields());
- }
- if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
- {
- idealState.getRecord().setMapFields(newIdealState.getMapFields());
- }
- }
- else
- {
- for (int i = 0; i < partitions; i++)
- {
- String partitionName = keyPrefix + "_" + i;
- idealState.getRecord().setMapField(partitionName, new HashMap<String, String>());
- idealState.getRecord().setListField(partitionName, new ArrayList<String>());
- }
- }
- setResourceIdealState(clusterName, resourceName, idealState);
- }
-
- @Override
- public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
- {
- ZNRecord idealStateRecord =
- (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile)));
- if (idealStateRecord.getId() == null
- || !idealStateRecord.getId().equals(resourceName))
- {
- throw new IllegalArgumentException("ideal state must have same id as resource name");
- }
- setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
- }
-
- private static byte[] readFile(String filePath) throws IOException
- {
- File file = new File(filePath);
-
- int size = (int) file.length();
- byte[] bytes = new byte[size];
- DataInputStream dis = new DataInputStream(new FileInputStream(file));
- int read = 0;
- int numRead = 0;
- while (read < bytes.length
- && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0)
- {
- read = read + numRead;
- }
- return bytes;
- }
-
- public void addStateModelDef(String clusterName,
- String stateModelDefName,
- String stateModelDefFile) throws IOException
- {
- ZNRecord record =
- (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
- if (record == null || record.getId() == null
- || !record.getId().equals(stateModelDefName))
- {
- throw new IllegalArgumentException("state model definition must have same id as state model def name");
- }
- addStateModelDef(clusterName, stateModelDefName, new StateModelDefinition(record));
-
- }
-
- public void addMessageConstraint(String clusterName,
- final String constraintId,
- final Map<String, String> constraints)
- {
- ZkBaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-
- Builder keyBuilder = new Builder(clusterName);
- String path = keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()).getPath();
-
- baseAccessor.update(path, new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if (currentData == null)
- {
- currentData = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
- }
-
- Map<String, String> map = currentData.getMapField(constraintId);
- if (map == null)
- {
- map = new TreeMap<String, String>();
- currentData.setMapField(constraintId, map);
- } else
- {
- logger.warn("Overwrite existing constraint " + constraintId + ": " + map);
- }
-
- for (String key : constraints.keySet())
- {
- // make sure contraint attribute is valid
- ConstraintAttribute attr = ConstraintAttribute.valueOf(key.toUpperCase());
-
- map.put(attr.toString(), constraints.get(key));
- }
-
- return currentData;
- }
- }, AccessOption.PERSISTENT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java
deleted file mode 100644
index 9df0638..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKHelixDataAccessor.java
+++ /dev/null
@@ -1,590 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.GroupCommit;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordAssembler;
-import com.linkedin.helix.ZNRecordBucketizer;
-import com.linkedin.helix.ZNRecordUpdater;
-import com.linkedin.helix.controller.restlet.ZNRecordUpdate;
-import com.linkedin.helix.controller.restlet.ZNRecordUpdate.OpCode;
-import com.linkedin.helix.controller.restlet.ZkPropertyTransferClient;
-import com.linkedin.helix.model.LiveInstance;
-
-public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener
-{
- private static Logger LOG =
- Logger.getLogger(ZKHelixDataAccessor.class);
- private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
- final InstanceType _instanceType;
- private final String _clusterName;
- private final Builder _propertyKeyBuilder;
- ZkPropertyTransferClient _zkPropertyTransferClient = null;
- private final GroupCommit _groupCommit = new GroupCommit();
- String _zkPropertyTransferSvcUrl = null;
-
- public ZKHelixDataAccessor(String clusterName,
- BaseDataAccessor<ZNRecord> baseDataAccessor)
- {
- this(clusterName, null, baseDataAccessor);
- }
-
- public ZKHelixDataAccessor(String clusterName,
- InstanceType instanceType,
- BaseDataAccessor<ZNRecord> baseDataAccessor)
- {
- _clusterName = clusterName;
- _instanceType = instanceType;
- _baseDataAccessor = baseDataAccessor;
- _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
- }
-
- @Override
- public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value)
- {
- PropertyType type = key.getType();
- String path = key.getPath();
- int options = constructOptions(type);
- return _baseDataAccessor.create(path, value.getRecord(), options);
- }
-
- @Override
- public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
- {
- PropertyType type = key.getType();
- if (!value.isValid())
- {
- throw new HelixException("The ZNRecord for " + type + " is not valid.");
- }
-
- String path = key.getPath();
- int options = constructOptions(type);
-
- if (type.usePropertyTransferServer())
- {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
- {
- ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
- _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
- return true;
- }
- }
-
- boolean success = false;
- switch (type)
- {
- case IDEALSTATES:
- case EXTERNALVIEW:
- // check if bucketized
- if (value.getBucketSize() > 0)
- {
- // set parent node
- ZNRecord metaRecord = new ZNRecord(value.getId());
- metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
- success = _baseDataAccessor.set(path, metaRecord, options);
- if (success)
- {
- ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
-
- Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
- List<String> paths = new ArrayList<String>();
- List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
- for (String bucketName : map.keySet())
- {
- paths.add(path + "/" + bucketName);
- bucketizedRecords.add(map.get(bucketName));
- }
-
- // TODO: set success accordingly
- _baseDataAccessor.setChildren(paths, bucketizedRecords, options);
- }
- }
- else
- {
- success = _baseDataAccessor.set(path, value.getRecord(), options);
- }
- break;
- default:
- success = _baseDataAccessor.set(path, value.getRecord(), options);
- break;
- }
- return success;
- }
-
- @Override
- public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value)
- {
- PropertyType type = key.getType();
- String path = key.getPath();
- int options = constructOptions(type);
-
- boolean success = false;
- switch (type)
- {
- case CURRENTSTATES:
- success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
- break;
- default:
- if (type.usePropertyTransferServer())
- {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
- {
- ZNRecordUpdate update =
- new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
- _zkPropertyTransferClient.enqueueZNRecordUpdate(update,
- _zkPropertyTransferSvcUrl);
-
- return true;
- }
- else
- {
- LOG.debug("getPropertyTransferUrl is null, skip updating the value");
- // TODO: consider skip the write operation
- return true;
- }
- }
- success =
- _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
- break;
- }
- return success;
- }
-
- @Override
- public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
- {
- if (keys == null || keys.size() == 0)
- {
- return Collections.emptyList();
- }
-
- List<T> childValues = new ArrayList<T>();
-
- // read all records
- List<String> paths = new ArrayList<String>();
- for (PropertyKey key : keys)
- {
- paths.add(key.getPath());
- }
- List<ZNRecord> children = _baseDataAccessor.get(paths, null, 0);
-
- // check if bucketized
- for (int i = 0; i < keys.size(); i++)
- {
- PropertyKey key = keys.get(i);
- ZNRecord record = children.get(i);
-
- PropertyType type = key.getType();
- String path = key.getPath();
- int options = constructOptions(type);
- // ZNRecord record = null;
-
- switch (type)
- {
- case CURRENTSTATES:
- case IDEALSTATES:
- case EXTERNALVIEW:
- // check if bucketized
- if (record != null)
- {
- HelixProperty property = new HelixProperty(record);
-
- int bucketSize = property.getBucketSize();
- if (bucketSize > 0)
- {
- List<ZNRecord> childRecords =
- _baseDataAccessor.getChildren(path, null, options);
- ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
-
- // merge with parent node value
- if (assembledRecord != null)
- {
- record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
- record.getListFields().putAll(assembledRecord.getListFields());
- record.getMapFields().putAll(assembledRecord.getMapFields());
- }
- }
- }
- break;
- default:
- break;
- }
-
- @SuppressWarnings("unchecked")
- T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
- childValues.add(t);
- }
-
- return childValues;
- }
-
- @Override
- public <T extends HelixProperty> T getProperty(PropertyKey key)
- {
- PropertyType type = key.getType();
- String path = key.getPath();
- int options = constructOptions(type);
- ZNRecord record = null;
- try
- {
- Stat stat = new Stat();
- record = _baseDataAccessor.get(path, stat, options);
- if (record != null)
- {
- record.setCreationTime(stat.getCtime());
- record.setModifiedTime(stat.getMtime());
- }
- }
- catch (ZkNoNodeException e)
- {
- // OK
- }
-
- switch (type)
- {
- case CURRENTSTATES:
- case IDEALSTATES:
- case EXTERNALVIEW:
- // check if bucketized
- if (record != null)
- {
- HelixProperty property = new HelixProperty(record);
-
- int bucketSize = property.getBucketSize();
- if (bucketSize > 0)
- {
- List<ZNRecord> childRecords =
- _baseDataAccessor.getChildren(path, null, options);
- ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
-
- // merge with parent node value
- if (assembledRecord != null)
- {
- record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
- record.getListFields().putAll(assembledRecord.getListFields());
- record.getMapFields().putAll(assembledRecord.getMapFields());
- }
- }
- }
- break;
- default:
- break;
- }
-
- @SuppressWarnings("unchecked")
- T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
- return t;
- }
-
- @Override
- public boolean removeProperty(PropertyKey key)
- {
- PropertyType type = key.getType();
- String path = key.getPath();
- int options = constructOptions(type);
-
- return _baseDataAccessor.remove(path, options);
- }
-
- @Override
- public List<String> getChildNames(PropertyKey key)
- {
- PropertyType type = key.getType();
- String parentPath = key.getPath();
- int options = constructOptions(type);
- List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
- if (childNames == null)
- {
- childNames = Collections.emptyList();
- }
- return childNames;
- }
-
- @Override
- public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
- {
- PropertyType type = key.getType();
- String parentPath = key.getPath();
- int options = constructOptions(type);
- List<T> childValues = new ArrayList<T>();
-
- List<ZNRecord> children = _baseDataAccessor.getChildren(parentPath, null, options);
- if (children != null)
- {
- for (ZNRecord record : children)
- {
- switch (type)
- {
- case CURRENTSTATES:
- case IDEALSTATES:
- case EXTERNALVIEW:
- if (record != null)
- {
- HelixProperty property = new HelixProperty(record);
-
- int bucketSize = property.getBucketSize();
- if (bucketSize > 0)
- {
- // TODO: fix this if record.id != pathName
- String childPath = parentPath + "/" + record.getId();
- List<ZNRecord> childRecords =
- _baseDataAccessor.getChildren(childPath, null, options);
- ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
-
- // merge with parent node value
- if (assembledRecord != null)
- {
- record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
- record.getListFields().putAll(assembledRecord.getListFields());
- record.getMapFields().putAll(assembledRecord.getMapFields());
- }
- }
- }
-
- break;
- default:
- break;
- }
-
- if (record != null)
- {
- @SuppressWarnings("unchecked")
- T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
- childValues.add(t);
- }
- }
- }
- return childValues;
- }
-
- @Override
- public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key)
- {
- PropertyType type = key.getType();
- String parentPath = key.getPath();
- int options = constructOptions(type);
- List<T> children = getChildValues(key);
- Map<String, T> childValuesMap = new HashMap<String, T>();
- for (T t : children)
- {
- childValuesMap.put(t.getRecord().getId(), t);
- }
- return childValuesMap;
-
- }
-
- @Override
- public Builder keyBuilder()
- {
- return _propertyKeyBuilder;
- }
-
- private int constructOptions(PropertyType type)
- {
- int options = 0;
- if (type.isPersistent())
- {
- options = options | AccessOption.PERSISTENT;
- }
- else
- {
- options = options | AccessOption.EPHEMERAL;
- }
-
- // if (type == PropertyType.CURRENTSTATES && _instanceType ==
- // InstanceType.PARTICIPANT)
- // {
- // options = options | BaseDataAccessor.Option.WRITE_THROUGH;
- // }
- // else if (type == PropertyType.EXTERNALVIEW
- // && _instanceType == InstanceType.CONTROLLER)
- // {
- // options = options | BaseDataAccessor.Option.WRITE_THROUGH;
- // }
-
- return options;
- }
-
- @Override
- public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
- List<T> children)
- {
- // TODO: add validation
- int options = -1;
- List<String> paths = new ArrayList<String>();
- List<ZNRecord> records = new ArrayList<ZNRecord>();
- for (int i = 0; i < keys.size(); i++)
- {
- PropertyKey key = keys.get(i);
- PropertyType type = key.getType();
- String path = key.getPath();
- paths.add(path);
- HelixProperty value = children.get(i);
- records.add(value.getRecord());
- options = constructOptions(type);
- }
- return _baseDataAccessor.createChildren(paths, records, options);
- }
-
- @Override
- public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys,
- List<T> children)
- {
- int options = -1;
- List<String> paths = new ArrayList<String>();
- List<ZNRecord> records = new ArrayList<ZNRecord>();
-
- List<List<String>> bucketizedPaths =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
- List<List<ZNRecord>> bucketizedRecords =
- new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(),
- null));
-
- for (int i = 0; i < keys.size(); i++)
- {
- PropertyKey key = keys.get(i);
- PropertyType type = key.getType();
- String path = key.getPath();
- paths.add(path);
- options = constructOptions(type);
-
- HelixProperty value = children.get(i);
-
- switch (type)
- {
- case EXTERNALVIEW:
- if (value.getBucketSize() == 0)
- {
- records.add(value.getRecord());
- }
- else
- {
- ZNRecord metaRecord = new ZNRecord(value.getId());
- metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
- records.add(metaRecord);
-
- ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
-
- Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
- List<String> childBucketizedPaths = new ArrayList<String>();
- List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
- for (String bucketName : map.keySet())
- {
- childBucketizedPaths.add(path + "/" + bucketName);
- childBucketizedRecords.add(map.get(bucketName));
- }
- bucketizedPaths.set(i, childBucketizedPaths);
- bucketizedRecords.set(i, childBucketizedRecords);
- }
- break;
- default:
- records.add(value.getRecord());
- break;
- }
- }
-
- // set non-bucketized nodes or parent nodes of bucketized nodes
- boolean success[] = _baseDataAccessor.setChildren(paths, records, options);
-
- // set bucketized nodes
- List<String> allBucketizedPaths = new ArrayList<String>();
- List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();
-
- for (int i = 0; i < keys.size(); i++)
- {
- if (success[i] && bucketizedPaths.get(i) != null)
- {
- allBucketizedPaths.addAll(bucketizedPaths.get(i));
- allBucketizedRecords.addAll(bucketizedRecords.get(i));
- }
- }
-
- // TODO: set success accordingly
- _baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);
-
- return success;
- }
-
- @Override
- public BaseDataAccessor<ZNRecord> getBaseDataAccessor()
- {
- return _baseDataAccessor;
- }
-
- @Override
- public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
- List<DataUpdater<ZNRecord>> updaters,
- int options)
- {
- return _baseDataAccessor.updateChildren(paths, updaters, options);
- }
-
- public void shutdown()
- {
- if (_zkPropertyTransferClient != null)
- {
- _zkPropertyTransferClient.shutdown();
- }
- }
-
- @Override
- public void onControllerChange(NotificationContext changeContext)
- {
- LOG.info("Controller has changed");
- refreshZkPropertyTransferUrl();
- if (_zkPropertyTransferClient == null)
- {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0)
- {
- LOG.info("Creating ZkPropertyTransferClient as we get url "
- + _zkPropertyTransferSvcUrl);
- _zkPropertyTransferClient =
- new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
- }
- }
- }
-
- void refreshZkPropertyTransferUrl()
- {
- try
- {
- LiveInstance leader = getProperty(keyBuilder().controllerLeader());
- if (leader != null)
- {
- _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
- LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl
- + " Controller " + leader.getInstanceName());
- }
- else
- {
- _zkPropertyTransferSvcUrl = null;
- }
- }
- catch (Exception e)
- {
- // LOG.error("", e);
- _zkPropertyTransferSvcUrl = null;
- }
- }
-}