You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:20:00 UTC
[52/53] [abbrv] Merge branch 'helix-logical-model'
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 0000000,07fec9e..2721d91
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@@ -1,0 -1,774 +1,774 @@@
+ package org.apache.helix.api.accessor;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+
+ import org.I0Itec.zkclient.DataUpdater;
+ import org.apache.helix.AccessOption;
+ import org.apache.helix.BaseDataAccessor;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixDefinedState;
+ import org.apache.helix.HelixException;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.RunningInstance;
+ import org.apache.helix.api.Scope;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ParticipantConfig;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.MessageId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.api.id.SessionId;
+ import org.apache.helix.api.id.StateModelDefId;
+ import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+ import org.apache.helix.model.CurrentState;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.IdealState.RebalanceMode;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+ import org.apache.helix.model.LiveInstance;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageState;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+
+ public class ParticipantAccessor {
+ private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
+
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+
+ public ParticipantAccessor(HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ }
+
+ /**
+ * enable/disable a participant
+ * @param participantId
+ * @param isEnabled
+ * @return true if enable state succeeded, false otherwise
+ */
+ boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
+ String participantName = participantId.stringify();
+ if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+ return false;
+ }
+
+ InstanceConfig config = new InstanceConfig(participantName);
+ config.setInstanceEnabled(isEnabled);
+ return _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
+ }
+
+ /**
+ * disable participant
+ * @param participantId
+ * @return true if disabled successfully, false otherwise
+ */
+ public boolean disableParticipant(ParticipantId participantId) {
+ return enableParticipant(participantId, false);
+ }
+
+ /**
+ * enable participant
+ * @param participantId
+ * @return true if enabled successfully, false otherwise
+ */
+ public boolean enableParticipant(ParticipantId participantId) {
+ return enableParticipant(participantId, true);
+ }
+
+ /**
+ * create messages for participant
+ * @param participantId
+ * @param msgMap map of message-id to message
+ */
+ public void insertMessagesToParticipant(ParticipantId participantId,
+ Map<MessageId, Message> msgMap) {
+ List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+ List<Message> msgs = new ArrayList<Message>();
+ for (MessageId msgId : msgMap.keySet()) {
+ msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+ msgs.add(msgMap.get(msgId));
+ }
+
+ _accessor.createChildren(msgKeys, msgs);
+ }
+
+ /**
+ * set messages of participant
+ * @param participantId
+ * @param msgMap map of message-id to message
+ */
+ public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+ String participantName = participantId.stringify();
+ List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+ List<Message> msgs = new ArrayList<Message>();
+ for (MessageId msgId : msgMap.keySet()) {
+ msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+ msgs.add(msgMap.get(msgId));
+ }
+ _accessor.setChildren(msgKeys, msgs);
+ }
+
+ /**
+ * delete messages from participant
+ * @param participantId
+ * @param msgIdSet
+ */
+ public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+ String participantName = participantId.stringify();
+ List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+ for (MessageId msgId : msgIdSet) {
+ msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+ }
+
+ // TODO impl batch remove
+ for (PropertyKey msgKey : msgKeys) {
+ _accessor.removeProperty(msgKey);
+ }
+ }
+
+ /**
+ * enable/disable partitions on a participant
+ * @param enabled
+ * @param participantId
+ * @param resourceId
+ * @param partitionIdSet
+ * @return true if enable state changed successfully, false otherwise
+ */
+ boolean enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+ final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
+ String participantName = participantId.stringify();
+ String resourceName = resourceId.stringify();
+
+ // check instanceConfig exists
+ PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
+ if (_accessor.getProperty(instanceConfigKey) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+ return false;
+ }
+
+ // check resource exist. warn if not
- IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
++ IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceName));
+ if (idealState == null) {
+ LOG.warn("Disable partitions: " + partitionIdSet + ", resource: " + resourceId
+ + " does NOT exist. probably disable it during ERROR->DROPPED transtition");
+
+ } else {
+ // check partitions exist. warn if not
+ for (PartitionId partitionId : partitionIdSet) {
+ if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
+ .getPreferenceList(partitionId) == null)
+ || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
+ .getParticipantStateMap(partitionId) == null)) {
+ LOG.warn("Resource: " + resourceId + ", partition: " + partitionId
+ + ", partition does NOT exist in ideal state");
+ }
+ }
+ }
+
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ final List<String> partitionNames = new ArrayList<String>();
+ for (PartitionId partitionId : partitionIdSet) {
+ partitionNames.add(partitionId.stringify());
+ }
+
+ return baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Instance: " + participantId + ", participant config is null");
+ }
+
+ // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
+ List<String> list =
+ currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ Set<String> disabledPartitions = new HashSet<String>();
+ if (list != null) {
+ disabledPartitions.addAll(list);
+ }
+
+ if (enabled) {
+ disabledPartitions.removeAll(partitionNames);
+ } else {
+ disabledPartitions.addAll(partitionNames);
+ }
+
+ list = new ArrayList<String>(disabledPartitions);
+ Collections.sort(list);
+ currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+ return currentData;
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * disable partitions on a participant
+ * @param participantId
+ * @param resourceId
+ * @param disablePartitionIdSet
+ * @return true if disabled successfully, false otherwise
+ */
+ public boolean disablePartitionsForParticipant(ParticipantId participantId,
+ ResourceId resourceId, Set<PartitionId> disablePartitionIdSet) {
+ return enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+ }
+
+ /**
+ * enable partitions on a participant
+ * @param participantId
+ * @param resourceId
+ * @param enablePartitionIdSet
+ * @return true if enabled successfully, false otherwise
+ */
+ public boolean enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ Set<PartitionId> enablePartitionIdSet) {
+ return enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
+ }
+
+ /**
+ * Reset partitions assigned to a set of participants
+ * @param resetParticipantIdSet the participants to reset
+ * @return true if reset, false otherwise
+ */
+ public boolean resetParticipants(Set<ParticipantId> resetParticipantIdSet) {
+ List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+ for (ParticipantId participantId : resetParticipantIdSet) {
+ for (ExternalView extView : extViews) {
+ Set<PartitionId> resetPartitionIdSet = Sets.newHashSet();
+ for (PartitionId partitionId : extView.getPartitionIdSet()) {
+ Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+ if (stateMap.containsKey(participantId)
+ && stateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR))) {
+ resetPartitionIdSet.add(partitionId);
+ }
+ }
+ resetPartitionsForParticipant(participantId, extView.getResourceId(), resetPartitionIdSet);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * reset partitions on a participant
+ * @param participantId
+ * @param resourceId
+ * @param resetPartitionIdSet
+ * @return true if partitions reset, false otherwise
+ */
+ public boolean resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ Set<PartitionId> resetPartitionIdSet) {
+ // make sure the participant is running
+ Participant participant = readParticipant(participantId);
+ if (!participant.isAlive()) {
+ LOG.error("Cannot reset partitions because the participant is not running");
+ return false;
+ }
+ RunningInstance runningInstance = participant.getRunningInstance();
+
+ // check that the resource exists
+ ResourceAccessor resourceAccessor = resourceAccessor();
+ Resource resource = resourceAccessor.readResource(resourceId);
+ if (resource == null || resource.getRebalancerConfig() == null) {
+ LOG.error("Cannot reset partitions because the resource is not present");
+ return false;
+ }
+
+ // need the rebalancer context for the resource
+ RebalancerContext context =
+ resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ if (context == null) {
+ LOG.error("Rebalancer context for resource does not exist");
+ return false;
+ }
+
+ // ensure that all partitions to reset exist
+ Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+ if (!partitionSet.containsAll(resetPartitionIdSet)) {
+ LOG.error("Not all of the specified partitions to reset exist for the resource");
+ return false;
+ }
+
+ // check for a valid current state that has all specified partitions in ERROR state
+ CurrentState currentState = participant.getCurrentStateMap().get(resourceId);
+ if (currentState == null) {
+ LOG.error("The participant does not have a current state for the resource");
+ return false;
+ }
+ for (PartitionId partitionId : resetPartitionIdSet) {
+ if (!currentState.getState(partitionId).equals(State.from(HelixDefinedState.ERROR))) {
+ LOG.error("Partition " + partitionId + " is not in error state, aborting reset");
+ return false;
+ }
+ }
+
+ // make sure that there are no pending transition messages
+ for (Message message : participant.getMessageMap().values()) {
+ if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
+ || !runningInstance.getSessionId().equals(message.getTypedTgtSessionId())
+ || !resourceId.equals(message.getResourceId())
+ || !resetPartitionIdSet.contains(message.getPartitionId())) {
+ continue;
+ }
+ LOG.error("Cannot reset partitions because of the following pending message: " + message);
+ return false;
+ }
+
+ // set up the source id
+ String adminName = null;
+ try {
+ adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+ } catch (UnknownHostException e) {
+ // can ignore it
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+ }
+ adminName = "UNKNOWN";
+ }
+
+ // build messages to signal the transition
+ StateModelDefId stateModelDefId = context.getStateModelDefId();
+ StateModelDefinition stateModelDef =
+ _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+ Map<MessageId, Message> messageMap = Maps.newHashMap();
+ for (PartitionId partitionId : resetPartitionIdSet) {
+ // send ERROR to initialState message
+ MessageId msgId = MessageId.from(UUID.randomUUID().toString());
+ Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+ message.setSrcName(adminName);
+ message.setTgtName(participantId.stringify());
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionId(partitionId);
+ message.setResourceId(resourceId);
+ message.setTgtSessionId(runningInstance.getSessionId());
+ message.setStateModelDef(stateModelDefId);
+ message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
+ message.setToState(stateModelDef.getTypedInitialState());
+ message.setStateModelFactoryId(context.getStateModelFactoryId());
+
+ messageMap.put(message.getMessageId(), message);
+ }
+
+ // send the messages
+ insertMessagesToParticipant(participantId, messageMap);
+ return true;
+ }
+
+ /**
+ * Read the user config of the participant
+ * @param participantId the participant to to look up
+ * @return UserConfig, or null
+ */
+ public UserConfig readUserConfig(ParticipantId participantId) {
+ InstanceConfig instanceConfig =
+ _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+ return instanceConfig != null ? instanceConfig.getUserConfig() : null;
+ }
+
+ /**
+ * Set the user config of the participant, overwriting existing user configs
+ * @param participantId the participant to update
+ * @param userConfig the new user config
+ * @return true if the user config was set, false otherwise
+ */
+ public boolean setUserConfig(ParticipantId participantId, UserConfig userConfig) {
+ ParticipantConfig.Delta delta =
+ new ParticipantConfig.Delta(participantId).setUserConfig(userConfig);
+ return updateParticipant(participantId, delta) != null;
+ }
+
+ /**
+ * Add user configuration to the existing participant user configuration. Overwrites properties
+ * with
+ * the same key
+ * @param participant the participant to update
+ * @param userConfig the user config key-value pairs to add
+ * @return true if the user config was updated, false otherwise
+ */
+ public boolean updateUserConfig(ParticipantId participantId, UserConfig userConfig) {
+ InstanceConfig instanceConfig = new InstanceConfig(participantId);
+ instanceConfig.addNamespacedConfig(userConfig);
+ return _accessor.updateProperty(_keyBuilder.instanceConfig(participantId.stringify()),
+ instanceConfig);
+ }
+
+ /**
+ * Clear any user-specified configuration from the participant
+ * @param participantId the participant to update
+ * @return true if the config was cleared, false otherwise
+ */
+ public boolean dropUserConfig(ParticipantId participantId) {
+ return setUserConfig(participantId, new UserConfig(Scope.participant(participantId)));
+ }
+
+ /**
+ * Update a participant configuration
+ * @param participantId the participant to update
+ * @param participantDelta changes to the participant
+ * @return ParticipantConfig, or null if participant is not persisted
+ */
+ public ParticipantConfig updateParticipant(ParticipantId participantId,
+ ParticipantConfig.Delta participantDelta) {
+ Participant participant = readParticipant(participantId);
+ if (participant == null) {
+ LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+ return null;
+ }
+ ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+ setParticipant(config);
+ return config;
+ }
+
+ /**
+ * Set the configuration of an existing participant
+ * @param participantConfig participant configuration
+ * @return true if config was set, false if there was an error
+ */
+ public boolean setParticipant(ParticipantConfig participantConfig) {
+ if (participantConfig == null) {
+ LOG.error("Participant config not initialized");
+ return false;
+ }
+ InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId());
+ instanceConfig.setHostName(participantConfig.getHostName());
+ instanceConfig.setPort(Integer.toString(participantConfig.getPort()));
+ for (String tag : participantConfig.getTags()) {
+ instanceConfig.addTag(tag);
+ }
+ for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
+ instanceConfig.setParticipantEnabledForPartition(partitionId, false);
+ }
+ instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
+ instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
+ _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()),
+ instanceConfig);
+ return true;
+ }
+
+ /**
+ * create a participant based on physical model
+ * @param participantId
+ * @param instanceConfig
+ * @param userConfig
+ * @param liveInstance
+ * @param instanceMsgMap map of message-id to message
+ * @param instanceCurStateMap map of resource-id to current-state
+ * @return participant
+ */
+ static Participant createParticipant(ParticipantId participantId, InstanceConfig instanceConfig,
+ UserConfig userConfig, LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
+ Map<String, CurrentState> instanceCurStateMap) {
+
+ String hostName = instanceConfig.getHostName();
+
+ int port = -1;
+ try {
+ port = Integer.parseInt(instanceConfig.getPort());
+ } catch (IllegalArgumentException e) {
+ // keep as -1
+ }
+ if (port < 0 || port > 65535) {
+ port = -1;
+ }
+ boolean isEnabled = instanceConfig.getInstanceEnabled();
+
+ List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
+ Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
+ if (disabledPartitions != null) {
+ disabledPartitionIdSet = new HashSet<PartitionId>();
+ for (String partitionId : disabledPartitions) {
+ disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
+ PartitionId.stripResourceId(partitionId)));
+ }
+ }
+
+ Set<String> tags = new HashSet<String>(instanceConfig.getTags());
+
+ RunningInstance runningInstance = null;
+ if (liveInstance != null) {
+ runningInstance =
+ new RunningInstance(liveInstance.getTypedSessionId(),
+ liveInstance.getTypedHelixVersion(), liveInstance.getProcessId());
+ }
+
+ Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+ if (instanceMsgMap != null) {
+ for (String msgId : instanceMsgMap.keySet()) {
+ Message message = instanceMsgMap.get(msgId);
+ msgMap.put(MessageId.from(msgId), message);
+ }
+ }
+
+ Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
+ if (instanceCurStateMap != null) {
+
+ for (String resourceName : instanceCurStateMap.keySet()) {
+ curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName));
+ }
+ }
+
+ return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+ runningInstance, curStateMap, msgMap, userConfig);
+ }
+
+ /**
+ * read participant related data
+ * @param participantId
+ * @return participant, or null if participant not available
+ */
+ public Participant readParticipant(ParticipantId participantId) {
+ // read physical model
+ String participantName = participantId.stringify();
+ InstanceConfig instanceConfig =
+ _accessor.getProperty(_keyBuilder.instanceConfig(participantName));
+
+ if (instanceConfig == null) {
+ LOG.error("Participant " + participantId + " is not present on the cluster");
+ return null;
+ }
+
+ UserConfig userConfig = instanceConfig.getUserConfig();
+ LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
+
+ Map<String, Message> instanceMsgMap = Collections.emptyMap();
+ Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
+ if (liveInstance != null) {
+ SessionId sessionId = liveInstance.getTypedSessionId();
+
+ instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+ instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+ }
+
+ return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+ instanceMsgMap, instanceCurStateMap);
+ }
+
+ /**
+ * update resource current state of a participant
+ * @param resourceId resource id
+ * @param participantId participant id
+ * @param sessionId session id
+ * @param curStateUpdate current state change delta
+ */
+ public void updateCurrentState(ResourceId resourceId, ParticipantId participantId,
+ SessionId sessionId, CurrentState curStateUpdate) {
+ _accessor.updateProperty(
+ _keyBuilder.currentState(participantId.stringify(), sessionId.stringify(),
+ resourceId.stringify()), curStateUpdate);
+ }
+
+ /**
+ * drop resource current state of a participant
+ * @param resourceId resource id
+ * @param participantId participant id
+ * @param sessionId session id
+ * @return true if dropped, false otherwise
+ */
+ public boolean dropCurrentState(ResourceId resourceId, ParticipantId participantId,
+ SessionId sessionId) {
+ return _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+ sessionId.stringify(), resourceId.stringify()));
+ }
+
+ /**
+ * drop a participant from cluster
+ * @param participantId
+ * @return true if participant dropped, false if there was an error
+ */
+ boolean dropParticipant(ParticipantId participantId) {
+ if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+ }
+
+ if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+ LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
+ }
+
+ // delete participant config path
+ _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+ // delete participant path
+ _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+ return true;
+ }
+
+ /**
+ * Let a new participant take the place of an existing participant
+ * @param oldParticipantId the participant to drop
+ * @param newParticipantId the participant that takes its place
+ * @return true if swap successful, false otherwise
+ */
+ public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+ Participant oldParticipant = readParticipant(oldParticipantId);
+ if (oldParticipant == null) {
+ LOG.error("Could not swap participants because the old participant does not exist");
+ return false;
+ }
+ if (oldParticipant.isEnabled()) {
+ LOG.error("Could not swap participants because the old participant is still enabled");
+ return false;
+ }
+ if (oldParticipant.isAlive()) {
+ LOG.error("Could not swap participants because the old participant is still live");
+ return false;
+ }
+ Participant newParticipant = readParticipant(newParticipantId);
+ if (newParticipant == null) {
+ LOG.error("Could not swap participants because the new participant does not exist");
+ return false;
+ }
+ dropParticipant(oldParticipantId);
+ ResourceAccessor resourceAccessor = resourceAccessor();
+ Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+ for (String resourceName : idealStateMap.keySet()) {
+ IdealState idealState = idealStateMap.get(resourceName);
+ swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
+ PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
+ resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
- _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
++ _accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
+ }
+ return true;
+ }
+
+ /**
+ * Replace occurrences of participants in preference lists and maps
+ * @param idealState the current ideal state
+ * @param oldParticipantId the participant to drop
+ * @param newParticipantId the participant that replaces it
+ */
+ protected void swapParticipantsInIdealState(IdealState idealState,
+ ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
+ if (oldPreferenceList != null) {
+ List<ParticipantId> newPreferenceList = Lists.newArrayList();
+ for (ParticipantId participantId : oldPreferenceList) {
+ if (participantId.equals(oldParticipantId)) {
+ newPreferenceList.add(newParticipantId);
+ } else if (!participantId.equals(newParticipantId)) {
+ newPreferenceList.add(participantId);
+ }
+ }
+ idealState.setPreferenceList(partitionId, newPreferenceList);
+ }
+ Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+ if (preferenceMap != null) {
+ if (preferenceMap.containsKey(oldParticipantId)) {
+ State state = preferenceMap.get(oldParticipantId);
+ preferenceMap.remove(oldParticipantId);
+ preferenceMap.put(newParticipantId, state);
+ }
+ idealState.setParticipantStateMap(partitionId, preferenceMap);
+ }
+ }
+ }
+
+ /**
+ * Create empty persistent properties to ensure that there is a valid participant structure
+ */
+ public void initParticipantStructure(ParticipantId participantId) {
+ List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ for (String path : paths) {
+ boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+ if (!status && LOG.isDebugEnabled()) {
+ LOG.debug(path + " already exists");
+ }
+ }
+ }
+
+ /**
+ * Clear properties for the participant
+ */
+ void clearParticipantStructure(ParticipantId participantId) {
+ List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ baseAccessor.remove(paths, 0);
+ }
+
+ /**
+ * check if participant structure is valid
+ * @return true if valid or false otherwise
+ */
+ public boolean isParticipantStructureValid(ParticipantId participantId) {
+ List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ if (baseAccessor != null) {
+ boolean[] existsResults = baseAccessor.exists(paths, 0);
+ for (boolean exists : existsResults) {
+ if (!exists) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get the paths that should be created if the participant exists
+ * @param keyBuilder PropertyKey.Builder for the cluster
+ * @param participantId the participant for which to generate paths
+ * @return list of required paths as strings
+ */
+ private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder,
+ ParticipantId participantId) {
+ List<String> paths = Lists.newArrayList();
+ paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
+ paths.add(keyBuilder.messages(participantId.stringify()).getPath());
+ paths.add(keyBuilder.currentStates(participantId.stringify()).getPath());
+ paths.add(keyBuilder.participantErrors(participantId.stringify()).getPath());
+ paths.add(keyBuilder.statusUpdates(participantId.stringify()).getPath());
+ return paths;
+ }
+
+ /**
+ * Get a ResourceAccessor instance
+ * @return ResourceAccessor
+ */
+ protected ResourceAccessor resourceAccessor() {
+ return new ResourceAccessor(_accessor);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 0000000,f24b5b1..0dfceca
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@@ -1,0 -1,470 +1,470 @@@
+ package org.apache.helix.api.accessor;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.helix.HelixConstants.StateModelToken;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixDefinedState;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.Scope;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.config.ResourceConfig.ResourceType;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+ import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.IdealState.RebalanceMode;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.ResourceAssignment;
+ import org.apache.helix.model.ResourceConfiguration;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+
+ public class ResourceAccessor {
+ private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+
+ public ResourceAccessor(HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ }
+
+ /**
+ * Read a single snapshot of a resource
+ * @param resourceId the resource id to read
+ * @return Resource or null if not present
+ */
+ public Resource readResource(ResourceId resourceId) {
+ ResourceConfiguration config =
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
- IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
++ IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+
+ if (config == null && idealState == null) {
+ LOG.error("Resource " + resourceId + " not present on the cluster");
+ return null;
+ }
+
+ ExternalView externalView =
+ _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+ ResourceAssignment resourceAssignment =
+ _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+ }
+
+ /**
+ * Update a resource configuration
+ * @param resourceId the resource id to update
+ * @param resourceDelta changes to the resource
+ * @return ResourceConfig, or null if the resource is not persisted
+ */
+ public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+ Resource resource = readResource(resourceId);
+ if (resource == null) {
+ LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+ return null;
+ }
+ ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+ setResource(config);
+ return config;
+ }
+
+ /**
+ * save resource assignment
+ * @param resourceId
+ * @param resourceAssignment
+ * @return true if set, false otherwise
+ */
+ public boolean setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ return _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+ resourceAssignment);
+ }
+
+ /**
+ * get resource assignment
+ * @param resourceId
+ * @return resource assignment or null
+ */
+ public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+ return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ }
+
+ /**
+ * Set a physical resource configuration, which may include user-defined configuration, as well as
+ * rebalancer configuration
+ * @param resourceId
+ * @param configuration
+ * @return true if set, false otherwise
+ */
+ private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ boolean status =
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ // also set an ideal state if the resource supports it
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+ IdealState idealState =
+ rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
+ configuration.getBatchMessageMode());
+ if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
++ _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ }
+ return status;
+ }
+
+ /**
+ * Set the context of the rebalancer. This includes all properties required for rebalancing this
+ * resource
+ * @param resourceId the resource to update
+ * @param context the new rebalancer context
+ * @return true if the context was set, false otherwise
+ */
+ public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+ RebalancerConfig config = new RebalancerConfig(context);
+ ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
+ resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+
+ // update the ideal state if applicable
+ IdealState oldIdealState =
- _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
++ _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+ if (oldIdealState != null) {
+ IdealState idealState =
+ rebalancerConfigToIdealState(config, oldIdealState.getBucketSize(),
+ oldIdealState.getBatchMessageMode());
+ if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
++ _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ }
+ }
+
+ return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
+ resourceConfig);
+ }
+
+ /**
+ * Read the user config of the resource
+ * @param resourceId the resource to to look up
+ * @return UserConfig, or null
+ */
+ public UserConfig readUserConfig(ResourceId resourceId) {
+ ResourceConfiguration resourceConfig =
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ return resourceConfig != null ? UserConfig.from(resourceConfig) : null;
+ }
+
+ /**
+ * Read the rebalancer config of the resource
+ * @param resourceId the resource to to look up
+ * @return RebalancerConfig, or null
+ */
+ public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
+ ResourceConfiguration resourceConfig =
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ return resourceConfig != null ? RebalancerConfig.from(resourceConfig) : null;
+ }
+
+ /**
+ * Set the user config of the resource, overwriting existing user configs
+ * @param resourceId the resource to update
+ * @param userConfig the new user config
+ * @return true if the user config was set, false otherwise
+ */
+ public boolean setUserConfig(ResourceId resourceId, UserConfig userConfig) {
+ ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId).setUserConfig(userConfig);
+ return updateResource(resourceId, delta) != null;
+ }
+
+ /**
+ * Add user configuration to the existing resource user configuration. Overwrites properties with
+ * the same key
+ * @param resourceId the resource to update
+ * @param userConfig the user config key-value pairs to add
+ * @return true if the user config was updated, false otherwise
+ */
+ public boolean updateUserConfig(ResourceId resourceId, UserConfig userConfig) {
+ ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
+ resourceConfig.addNamespacedConfig(userConfig);
+ return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
+ resourceConfig);
+ }
+
+ /**
+ * Clear any user-specified configuration from the resource
+ * @param resourceId the resource to update
+ * @return true if the config was cleared, false otherwise
+ */
+ public boolean dropUserConfig(ResourceId resourceId) {
+ return setUserConfig(resourceId, new UserConfig(Scope.resource(resourceId)));
+ }
+
+ /**
+ * Persist an existing resource's logical configuration
+ * @param resourceConfig logical resource configuration
+ * @return true if resource is set, false otherwise
+ */
+ public boolean setResource(ResourceConfig resourceConfig) {
+ if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) {
+ LOG.error("Resource not fully defined with a rebalancer context");
+ return false;
+ }
+ ResourceId resourceId = resourceConfig.getId();
+ ResourceConfiguration config = new ResourceConfiguration(resourceId);
+ config.addNamespacedConfig(resourceConfig.getUserConfig());
+ config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+ config.setBucketSize(resourceConfig.getBucketSize());
+ config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+ setConfiguration(resourceId, config);
+ return true;
+ }
+
+ /**
+ * Get a resource configuration, which may include user-defined configuration, as well as
+ * rebalancer configuration
+ * @param resourceId
+ * @return configuration or null
+ */
+ public ResourceConfiguration getConfiguration(ResourceId resourceId) {
+ return _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ }
+
+ /**
+ * set external view of a resource
+ * @param resourceId
+ * @param extView
+ * @return true if set, false otherwise
+ */
+ public boolean setExternalView(ResourceId resourceId, ExternalView extView) {
+ return _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
+ }
+
+ /**
+ * get the external view of a resource
+ * @param resourceId the resource to look up
+ * @return external view or null
+ */
+ public ExternalView readExternalView(ResourceId resourceId) {
+ return _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+ }
+
+ /**
+ * drop external view of a resource
+ * @param resourceId
+ * @return true if dropped, false otherwise
+ */
+ public boolean dropExternalView(ResourceId resourceId) {
+ return _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
+ }
+
+ /**
+ * reset resources for all participants
+ * @param resetResourceIdSet the resources to reset
+ * @return true if they were reset, false otherwise
+ */
+ public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
+ ParticipantAccessor accessor = participantAccessor();
+ List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+ for (ExternalView extView : extViews) {
+ if (!resetResourceIdSet.contains(extView.getResourceId())) {
+ continue;
+ }
+
+ Map<ParticipantId, Set<PartitionId>> resetPartitionIds = Maps.newHashMap();
+ for (PartitionId partitionId : extView.getPartitionIdSet()) {
+ Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+ for (ParticipantId participantId : stateMap.keySet()) {
+ State state = stateMap.get(participantId);
+ if (state.equals(State.from(HelixDefinedState.ERROR))) {
+ if (!resetPartitionIds.containsKey(participantId)) {
+ resetPartitionIds.put(participantId, new HashSet<PartitionId>());
+ }
+ resetPartitionIds.get(participantId).add(partitionId);
+ }
+ }
+ }
+ for (ParticipantId participantId : resetPartitionIds.keySet()) {
+ accessor.resetPartitionsForParticipant(participantId, extView.getResourceId(),
+ resetPartitionIds.get(participantId));
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Generate a default assignment for partitioned resources
+ * @param resourceId the resource to update
+ * @param replicaCount the new replica count (or -1 to use the existing one)
+ * @param participantGroupTag the new participant group tag (or null to use the existing one)
+ * @return true if assignment successful, false otherwise
+ */
+ public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+ String participantGroupTag) {
+ Resource resource = readResource(resourceId);
+ RebalancerConfig config = resource.getRebalancerConfig();
+ PartitionedRebalancerContext context =
+ config.getRebalancerContext(PartitionedRebalancerContext.class);
+ if (context == null) {
+ LOG.error("Only partitioned resource types are supported");
+ return false;
+ }
+ if (replicaCount != -1) {
+ context.setReplicaCount(replicaCount);
+ }
+ if (participantGroupTag != null) {
+ context.setParticipantGroupTag(participantGroupTag);
+ }
+ StateModelDefinition stateModelDef =
+ _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+ List<InstanceConfig> participantConfigs =
+ _accessor.getChildValues(_keyBuilder.instanceConfigs());
+ Set<ParticipantId> participantSet = Sets.newHashSet();
+ for (InstanceConfig participantConfig : participantConfigs) {
+ participantSet.add(participantConfig.getParticipantId());
+ }
+ context.generateDefaultConfiguration(stateModelDef, participantSet);
+ setRebalancerContext(resourceId, context);
+ return true;
+ }
+
+ /**
+ * Get an ideal state from a rebalancer config if the resource is partitioned
+ * @param config RebalancerConfig instance
+ * @param bucketSize bucket size to use
+ * @param batchMessageMode true if batch messaging allowed, false otherwise
+ * @return IdealState, or null
+ */
+ static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+ boolean batchMessageMode) {
+ PartitionedRebalancerContext partitionedContext =
+ config.getRebalancerContext(PartitionedRebalancerContext.class);
+ if (partitionedContext != null) {
+ IdealState idealState = new IdealState(partitionedContext.getResourceId());
+ idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
+ idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+ String replicas = null;
+ if (partitionedContext.anyLiveParticipant()) {
+ replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
+ } else {
+ replicas = Integer.toString(partitionedContext.getReplicaCount());
+ }
+ idealState.setReplicas(replicas);
+ idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
+ idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
+ idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
+ idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
+ idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+ idealState.setBucketSize(bucketSize);
+ idealState.setBatchMessageMode(batchMessageMode);
+ if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerContext semiAutoContext =
+ config.getRebalancerContext(SemiAutoRebalancerContext.class);
+ for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
+ idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+ }
+ } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerContext customContext =
+ config.getRebalancerContext(CustomRebalancerContext.class);
+ for (PartitionId partitionId : customContext.getPartitionSet()) {
+ idealState.setParticipantStateMap(partitionId,
+ customContext.getPreferenceMap(partitionId));
+ }
+ } else {
+ for (PartitionId partitionId : partitionedContext.getPartitionSet()) {
+ List<ParticipantId> preferenceList = Collections.emptyList();
+ idealState.setPreferenceList(partitionId, preferenceList);
+ Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
+ idealState.setParticipantStateMap(partitionId, participantStateMap);
+ }
+ }
+ return idealState;
+ }
+ return null;
+ }
+
+ /**
+ * Create a resource snapshot instance from the physical model
+ * @param resourceId the resource id
+ * @param resourceConfiguration physical resource configuration
+ * @param idealState ideal state of the resource
+ * @param externalView external view of the resource
+ * @param resourceAssignment current resource assignment
+ * @return Resource
+ */
+ static Resource createResource(ResourceId resourceId,
+ ResourceConfiguration resourceConfiguration, IdealState idealState,
+ ExternalView externalView, ResourceAssignment resourceAssignment) {
+ UserConfig userConfig;
+ RebalancerContext rebalancerContext = null;
+ ResourceType type = ResourceType.DATA;
+ if (resourceConfiguration != null) {
+ userConfig = resourceConfiguration.getUserConfig();
+ type = resourceConfiguration.getType();
+ } else {
+ userConfig = new UserConfig(Scope.resource(resourceId));
+ }
+ int bucketSize = 0;
+ boolean batchMessageMode = false;
+ if (idealState != null) {
+ if (resourceConfiguration != null) {
+ rebalancerContext =
+ resourceConfiguration.getRebalancerContext(PartitionedRebalancerContext.class);
+ }
+ if (rebalancerContext == null) {
+ // fallback: get rebalancer context from ideal state
+ rebalancerContext = PartitionedRebalancerContext.from(idealState);
+ }
+ bucketSize = idealState.getBucketSize();
+ batchMessageMode = idealState.getBatchMessageMode();
+ idealState.updateUserConfig(userConfig);
+ } else if (resourceConfiguration != null) {
+ bucketSize = resourceConfiguration.getBucketSize();
+ batchMessageMode = resourceConfiguration.getBatchMessageMode();
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
+ rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ }
+ if (rebalancerContext == null) {
+ rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
+ }
+ return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
+ rebalancerContext, userConfig, bucketSize, batchMessageMode);
+ }
+
+ /**
+ * Get a ParticipantAccessor instance
+ * @return ParticipantAccessor
+ */
+ protected ParticipantAccessor participantAccessor() {
+ return new ParticipantAccessor(_accessor);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 0000000,d0a96cc..6d7b0ef
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@@ -1,0 -1,196 +1,211 @@@
+ package org.apache.helix.controller.rebalancer;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+ import org.apache.helix.controller.stages.ResourceCurrentState;
+ import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+ import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+ import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+ import org.apache.helix.model.ResourceAssignment;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+
+ import com.google.common.base.Function;
+ import com.google.common.collect.Lists;
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ public class FullAutoRebalancer implements HelixRebalancer {
+ // These should be final, but are initialized in init rather than a constructor
+ private AutoRebalanceStrategy _algorithm;
+
+ private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
+
+ @Override
+ public void init(HelixManager helixManager) {
+ // do nothing
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
+ FullAutoRebalancerContext config =
+ rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
+ // Compute a preference list based on the current ideal state
+ List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+ Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+ Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+ int replicas = -1;
+ if (config.anyLiveParticipant()) {
+ replicas = liveParticipants.size();
+ } else {
+ replicas = config.getReplicaCount();
+ }
+
+ // count how many replicas should be in each state
+ Map<State, String> upperBounds =
+ ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+ cluster.getConfig());
+ LinkedHashMap<State, Integer> stateCountMap =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
- liveParticipants.size(), replicas);
++ ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, liveParticipants.size(),
++ replicas);
+
+ // get the participant lists
+ List<ParticipantId> liveParticipantList =
+ new ArrayList<ParticipantId>(liveParticipants.keySet());
+ List<ParticipantId> allParticipantList =
+ new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+
+ // compute the current mapping from the current state
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+ currentMapping(config, currentState, stateCountMap);
+
+ // If there are nodes tagged with resource, use only those nodes
++ // If there are nodes tagged with resource name, use only those nodes
+ Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
++ Set<ParticipantId> taggedLiveNodes = new HashSet<ParticipantId>();
+ if (config.getParticipantGroupTag() != null) {
- for (ParticipantId participantId : liveParticipantList) {
- if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
++ for (ParticipantId participantId : allParticipantList) {
++ if (cluster.getParticipantMap().get(participantId).hasTag(config.getParticipantGroupTag())) {
+ taggedNodes.add(participantId);
++ if (liveParticipants.containsKey(participantId)) {
++ taggedLiveNodes.add(participantId);
++ }
+ }
+ }
- }
- if (taggedNodes.size() > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following instances with tag " + config.getResourceId() + " "
- + taggedNodes);
++ if (!taggedLiveNodes.isEmpty()) {
++ // live nodes exist that have this tag
++ if (LOG.isInfoEnabled()) {
++ LOG.info("found the following participants with tag " + config.getParticipantGroupTag()
++ + " for " + config.getResourceId() + ": " + taggedLiveNodes);
++ }
++ } else if (taggedNodes.isEmpty()) {
++ // no live nodes and no configured nodes have this tag
++ LOG.warn("Resource " + config.getResourceId() + " has tag "
++ + config.getParticipantGroupTag() + " but no configured participants have this tag");
++ } else {
++ // configured nodes have this tag, but no live nodes have this tag
++ LOG.warn("Resource " + config.getResourceId() + " has tag "
++ + config.getParticipantGroupTag() + " but no live participants have this tag");
+ }
- liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
++ allParticipantList = new ArrayList<ParticipantId>(taggedNodes);
++ liveParticipantList = new ArrayList<ParticipantId>(taggedLiveNodes);
+ }
+
+ // determine which nodes the replicas should live on
+ int maxPartition = config.getMaxPartitionsPerParticipant();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("currentMapping: " + currentMapping);
+ LOG.info("stateCountMap: " + stateCountMap);
+ LOG.info("liveNodes: " + liveParticipantList);
+ LOG.info("allNodes: " + allParticipantList);
+ LOG.info("maxPartition: " + maxPartition);
+ }
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ _algorithm =
+ new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+ placementScheme);
+ ZNRecord newMapping =
+ _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+ allParticipantList);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("newMapping: " + newMapping);
+ }
+
+ // compute a full partition mapping for the resource
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + config.getResourceId());
+ }
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : partitions) {
+ Set<ParticipantId> disabledParticipantsForPartition =
+ ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+ List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+ if (rawPreferenceList == null) {
+ rawPreferenceList = Collections.emptyList();
+ }
+ List<ParticipantId> preferenceList =
+ Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantName) {
+ return ParticipantId.from(participantName);
+ }
+ });
+ preferenceList =
+ ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
+ Map<ParticipantId, State> bestStateForPartition =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ liveParticipants.keySet(), stateModelDef, preferenceList,
+ currentState.getCurrentStateMap(config.getResourceId(), partition),
+ disabledParticipantsForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
+ }
+ return partitionMapping;
+ }
+
+ private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+ FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+ Map<State, Integer> stateCountMap) {
+ Map<PartitionId, Map<ParticipantId, State>> map =
+ new HashMap<PartitionId, Map<ParticipantId, State>>();
+
+ for (PartitionId partition : config.getPartitionSet()) {
+ Map<ParticipantId, State> curStateMap =
+ currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+ map.put(partition, new HashMap<ParticipantId, State>());
+ for (ParticipantId node : curStateMap.keySet()) {
+ State state = curStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition).put(node, state);
+ }
+ }
+
+ Map<ParticipantId, State> pendingStateMap =
+ currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+ for (ParticipantId node : pendingStateMap.keySet()) {
+ State state = pendingStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition).put(node, state);
+ }
+ }
+ }
+ return map;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
index 33593ae,d489378..3877686
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
@@@ -37,41 -35,55 +37,43 @@@ import org.restlet.resource.ServerResou
* REST resource for ZkPropertyTransfer server to receive PUT requests
* that submits ZNRecordUpdates
*/
-public class ZNRecordUpdateResource extends Resource {
+public class ZNRecordUpdateResource extends ServerResource {
- public static final String UPDATEKEY = "ZNRecordUpdate";
- private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
+ public static final String UPDATEKEY = "ZNRecordUpdate";
+ private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
- public ZNRecordUpdateResource() {
- getVariants().add(new Variant(MediaType.TEXT_PLAIN));
- getVariants().add(new Variant(MediaType.APPLICATION_JSON));
- setNegotiated(false);
- }
-
- @Override
- public Representation put(Representation entity) {
- try {
- ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
- @Override
- public boolean allowGet() {
- return false;
- }
-
- @Override
- public boolean allowPost() {
- return false;
++ public ZNRecordUpdateResource() {
++ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
++ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
++ setNegotiated(false);
+ }
+
+ @Override
- public boolean allowPut() {
- return true;
- }
-
- @Override
- public boolean allowDelete() {
- return false;
- }
-
- @Override
- public void storeRepresentation(Representation entity) {
++ public Representation put(Representation entity) {
+ try {
+ ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
- Form form = new Form(entity);
- String jsonPayload = form.getFirstValue(UPDATEKEY, true);
+ Form form = new Form(entity);
+ String jsonPayload = form.getFirstValue(UPDATEKEY, true);
- // Parse the map from zkPath --> ZNRecordUpdate from the payload
- StringReader sr = new StringReader(jsonPayload);
- ObjectMapper mapper = new ObjectMapper();
- TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef = new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
- };
- Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
- // Enqueue the ZNRecordUpdate for sending
- for (ZNRecordUpdate holder : holderMap.values()) {
- server.enqueueData(holder);
- LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
- }
- getResponse().setStatus(Status.SUCCESS_OK);
- } catch (Exception e) {
- LOG.error("", e);
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- }
- return null;
+ // Parse the map from zkPath --> ZNRecordUpdate from the payload
+ StringReader sr = new StringReader(jsonPayload);
+ ObjectMapper mapper = new ObjectMapper();
+ TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
+ new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
+ };
+ Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
+ // Enqueue the ZNRecordUpdate for sending
+ for (ZNRecordUpdate holder : holderMap.values()) {
+ server.enqueueData(holder);
+ LOG.info("Received " + holder.getPath() + " from "
+ + getRequest().getClientInfo().getAddress());
+ }
+ getResponse().setStatus(Status.SUCCESS_OK);
+ } catch (Exception e) {
+ LOG.error("", e);
+ getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
}
++ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 11955f5,051a2f3..933bf78
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@@ -63,68 -70,73 +70,72 @@@ public class BestPossibleStateCalcStag
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
long endTime = System.currentTimeMillis();
- logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ }
}
- private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput) {
- // for each ideal state
- // read the state model def
- // for each resource
- // get the preference list
- // for each instanceName check if its alive then assign a state
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ /**
+ * Fallback for cases when the resource has been dropped, but current state exists
+ * @param cluster cluster snapshot
+ * @param resourceId the resource for which to generate an assignment
+ * @param currentStateOutput full snapshot of the current state
+ * @param stateModelDef state model the resource follows
+ * @return assignment for the dropped resource
+ */
+ private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+ ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
+ ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+ Set<? extends PartitionId> mappedPartitions =
+ currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+ if (mappedPartitions == null) {
+ return partitionMapping;
+ }
+ for (PartitionId partitionId : mappedPartitions) {
+ Set<ParticipantId> disabledParticipantsForPartition =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partitionId);
+ Map<State, String> upperBounds =
- ConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
- cluster.getConfig());
++ ConstraintBasedAssignment
++ .stateConstraints(stateModelDef, resourceId, cluster.getConfig());
+ partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
+ .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+ stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ disabledParticipantsForPartition));
+ }
+ return partitionMapping;
+ }
+ private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+ Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
BestPossibleStateOutput output = new BestPossibleStateOutput();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
- for (String resourceName : resourceMap.keySet()) {
- logger.debug("Processing resource:" + resourceName);
-
- Resource resource = resourceMap.get(resourceName);
- // Ideal state may be gone. In that case we need to get the state model name
- // from the current state
- IdealState idealState = cache.getIdealState(resourceName);
-
- if (idealState == null) {
- // if ideal state is deleted, use an empty one
- logger.info("resource:" + resourceName + " does not exist anymore");
- idealState = new IdealState(resourceName);
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resourceId);
}
-
- Rebalancer rebalancer = null;
- if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
- && idealState.getRebalancerClassName() != null) {
- String rebalancerClassName = idealState.getRebalancerClassName();
- logger
- .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
- try {
- rebalancer =
- (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
- } catch (Exception e) {
- logger.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
- }
- }
- if (rebalancer == null) {
- if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
- rebalancer = new AutoRebalancer();
- } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
- rebalancer = new SemiAutoRebalancer();
- } else {
- rebalancer = new CustomRebalancer();
+ ResourceConfig resourceConfig = resourceMap.get(resourceId);
+ RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+ ResourceAssignment resourceAssignment = null;
+ if (rebalancerConfig != null) {
+ HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+ if (rebalancer != null) {
+ HelixManager manager = event.getAttribute("helixmanager");
+ rebalancer.init(manager);
+ resourceAssignment =
+ rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
}
}
-
- HelixManager manager = event.getAttribute("helixmanager");
- rebalancer.init(manager);
- ResourceAssignment partitionStateAssignment =
- rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
- if (partitionStateAssignment != null) {
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
- output.setState(resourceName, partition, newStateMap);
- }
+ if (resourceAssignment == null) {
+ RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+ resourceAssignment =
+ mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
}
-
+ output.setResourceAssignment(resourceId, resourceAssignment);
}
+
return output;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 35ef177,55a5e54..edceed6
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@@ -260,7 -274,7 +274,7 @@@ public class ExternalViewComputeStage e
// Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
keyBuilder = accessor.keyBuilder();
- accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta);
- accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
++ accessor.updateProperty(keyBuilder.idealStates(resourceId.stringify()), delta);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
index 359a959,2069974..0000000
deleted file mode 100644,100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
+++ /dev/null
@@@ -1,65 -1,66 +1,0 @@@
--package org.apache.helix.controller.stages;
--
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied. See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--
--import java.util.ArrayList;
--import java.util.Collections;
--import java.util.HashMap;
--import java.util.List;
--import java.util.Map;
--
--import org.apache.helix.model.Message;
--import org.apache.helix.model.Partition;
--
- public class MessageGenerationOutput {
-
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageGenerationOutput() {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-
- }
-
- public void addMessage(String resourceName, Partition partition, Message message) {
- if (!_messagesMap.containsKey(resourceName)) {
- _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
- }
- if (!_messagesMap.get(resourceName).containsKey(partition)) {
- _messagesMap.get(resourceName).put(partition, new ArrayList<Message>());
-
- }
- _messagesMap.get(resourceName).get(partition).add(message);
-
- }
-
- public List<Message> getMessages(String resourceName, Partition resource) {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null) {
- return map.get(resource);
- }
- return Collections.emptyList();
-
- }
-
- @Override
- public String toString() {
- return _messagesMap.toString();
- }
- }
-@Deprecated
-public class MessageGenerationOutput {
-
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageGenerationOutput() {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-
- }
-
- public void addMessage(String resourceName, Partition partition, Message message) {
- if (!_messagesMap.containsKey(resourceName)) {
- _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
- }
- if (!_messagesMap.get(resourceName).containsKey(partition)) {
- _messagesMap.get(resourceName).put(partition, new ArrayList<Message>());
-
- }
- _messagesMap.get(resourceName).get(partition).add(message);
-
- }
-
- public List<Message> getMessages(String resourceName, Partition resource) {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null) {
- return map.get(resource);
- }
- return Collections.emptyList();
-
- }
-
- @Override
- public String toString() {
- return _messagesMap.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 99668d3,59bae9f..6e30074
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@@ -906,7 -917,7 +917,7 @@@ public class ZKHelixAdmin implements He
new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
- accessor.setProperty(keyBuilder.idealState(idealState.getResourceId().stringify()), idealState);
++ accessor.setProperty(keyBuilder.idealStates(idealState.getResourceId().stringify()), idealState);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 94e8feb,6fbe690..471530c
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@@ -76,18 -76,7 +76,18 @@@ public class ZKHelixDataAccessor implem
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
- return _baseDataAccessor.create(path, value == null ? null : value.getRecord(), options);
+ boolean success = false;
+ switch (type) {
+ case STATEMODELDEFS:
- if (value.isValid()) {
++ if (value != null && value.isValid()) {
+ success = _baseDataAccessor.create(path, value.getRecord(), options);
+ }
+ break;
+ default:
- success = _baseDataAccessor.create(path, value.getRecord(), options);
++ success = _baseDataAccessor.create(path, value == null ? null : value.getRecord(), options);
+ break;
+ }
+ return success;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index e99e173,2e759e6..a74d217
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@@ -30,9 -30,13 +30,13 @@@ import java.util.TreeMap
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.builder.StateTransitionTableBuilder;
-import org.apache.log4j.Logger;
+import org.apache.helix.model.util.StateModelDefinitionValidator;
+ import com.google.common.collect.ImmutableList;
+
/**
* Describe the state model
*/
@@@ -185,9 -248,27 +247,18 @@@ public class StateModelDefinition exten
return _statesCountMap.get(state);
}
+ /**
+ * Number of participants that can be in each state
+ * @param state the state
+ * @return maximum instance count per state, can be "N" or "R"
+ */
+ public String getNumParticipantsPerState(State state) {
+ return _statesCountMap.get(state.toString());
+ }
+
@Override
public boolean isValid() {
- if (getInitialState() == null) {
- _logger.error("State model does not contain init state, statemodel:" + _record.getId());
- return false;
- }
- if (_record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString()) == null) {
- _logger.error("CurrentState does not contain StatesPriorityList, state model : "
- + _record.getId());
- return false;
- }
- return true;
+ return StateModelDefinitionValidator.isStateModelDefinitionValid(this);
}
// TODO move this to model.builder package, refactor StateModelConfigGenerator to use this
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------