You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:20:00 UTC

[52/53] [abbrv] Merge branch 'helix-logical-model'

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 0000000,07fec9e..2721d91
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@@ -1,0 -1,774 +1,774 @@@
+ package org.apache.helix.api.accessor;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ 
+ import org.I0Itec.zkclient.DataUpdater;
+ import org.apache.helix.AccessOption;
+ import org.apache.helix.BaseDataAccessor;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixDefinedState;
+ import org.apache.helix.HelixException;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.RunningInstance;
+ import org.apache.helix.api.Scope;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ParticipantConfig;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.MessageId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.api.id.SessionId;
+ import org.apache.helix.api.id.StateModelDefId;
+ import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+ import org.apache.helix.model.CurrentState;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.IdealState.RebalanceMode;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+ import org.apache.helix.model.LiveInstance;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageState;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+ 
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+ 
+ public class ParticipantAccessor {
+   private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
+ 
+   private final HelixDataAccessor _accessor;
+   private final PropertyKey.Builder _keyBuilder;
+ 
+   public ParticipantAccessor(HelixDataAccessor accessor) {
+     _accessor = accessor;
+     _keyBuilder = accessor.keyBuilder();
+   }
+ 
+   /**
+    * enable/disable a participant
+    * @param participantId
+    * @param isEnabled
+    * @return true if enable state succeeded, false otherwise
+    */
+   boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
+     String participantName = participantId.stringify();
+     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
+       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+       return false;
+     }
+ 
+     InstanceConfig config = new InstanceConfig(participantName);
+     config.setInstanceEnabled(isEnabled);
+     return _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
+   }
+ 
+   /**
+    * disable participant
+    * @param participantId
+    * @return true if disabled successfully, false otherwise
+    */
+   public boolean disableParticipant(ParticipantId participantId) {
+     return enableParticipant(participantId, false);
+   }
+ 
+   /**
+    * enable participant
+    * @param participantId
+    * @return true if enabled successfully, false otherwise
+    */
+   public boolean enableParticipant(ParticipantId participantId) {
+     return enableParticipant(participantId, true);
+   }
+ 
+   /**
+    * create messages for participant
+    * @param participantId
+    * @param msgMap map of message-id to message
+    */
+   public void insertMessagesToParticipant(ParticipantId participantId,
+       Map<MessageId, Message> msgMap) {
+     List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+     List<Message> msgs = new ArrayList<Message>();
+     for (MessageId msgId : msgMap.keySet()) {
+       msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+       msgs.add(msgMap.get(msgId));
+     }
+ 
+     _accessor.createChildren(msgKeys, msgs);
+   }
+ 
+   /**
+    * set messages of participant
+    * @param participantId
+    * @param msgMap map of message-id to message
+    */
+   public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+     String participantName = participantId.stringify();
+     List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+     List<Message> msgs = new ArrayList<Message>();
+     for (MessageId msgId : msgMap.keySet()) {
+       msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+       msgs.add(msgMap.get(msgId));
+     }
+     _accessor.setChildren(msgKeys, msgs);
+   }
+ 
+   /**
+    * delete messages from participant
+    * @param participantId
+    * @param msgIdSet
+    */
+   public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+     String participantName = participantId.stringify();
+     List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+     for (MessageId msgId : msgIdSet) {
+       msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+     }
+ 
+     // TODO impl batch remove
+     for (PropertyKey msgKey : msgKeys) {
+       _accessor.removeProperty(msgKey);
+     }
+   }
+ 
+   /**
+    * enable/disable partitions on a participant
+    * @param enabled
+    * @param participantId
+    * @param resourceId
+    * @param partitionIdSet
+    * @return true if enable state changed successfully, false otherwise
+    */
+   boolean enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+       final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
+     String participantName = participantId.stringify();
+     String resourceName = resourceId.stringify();
+ 
+     // check instanceConfig exists
+     PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
+     if (_accessor.getProperty(instanceConfigKey) == null) {
+       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+       return false;
+     }
+ 
+     // check resource exist. warn if not
 -    IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
++    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceName));
+     if (idealState == null) {
+       LOG.warn("Disable partitions: " + partitionIdSet + ", resource: " + resourceId
+           + " does NOT exist. probably disable it during ERROR->DROPPED transtition");
+ 
+     } else {
+       // check partitions exist. warn if not
+       for (PartitionId partitionId : partitionIdSet) {
+         if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
+             .getPreferenceList(partitionId) == null)
+             || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
+                 .getParticipantStateMap(partitionId) == null)) {
+           LOG.warn("Resource: " + resourceId + ", partition: " + partitionId
+               + ", partition does NOT exist in ideal state");
+         }
+       }
+     }
+ 
+     BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+     final List<String> partitionNames = new ArrayList<String>();
+     for (PartitionId partitionId : partitionIdSet) {
+       partitionNames.add(partitionId.stringify());
+     }
+ 
+     return baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+       @Override
+       public ZNRecord update(ZNRecord currentData) {
+         if (currentData == null) {
+           throw new HelixException("Instance: " + participantId + ", participant config is null");
+         }
+ 
+         // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
+         List<String> list =
+             currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+         Set<String> disabledPartitions = new HashSet<String>();
+         if (list != null) {
+           disabledPartitions.addAll(list);
+         }
+ 
+         if (enabled) {
+           disabledPartitions.removeAll(partitionNames);
+         } else {
+           disabledPartitions.addAll(partitionNames);
+         }
+ 
+         list = new ArrayList<String>(disabledPartitions);
+         Collections.sort(list);
+         currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+         return currentData;
+       }
+     }, AccessOption.PERSISTENT);
+   }
+ 
+   /**
+    * disable partitions on a participant
+    * @param participantId
+    * @param resourceId
+    * @param disablePartitionIdSet
+    * @return true if disabled successfully, false otherwise
+    */
+   public boolean disablePartitionsForParticipant(ParticipantId participantId,
+       ResourceId resourceId, Set<PartitionId> disablePartitionIdSet) {
+     return enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+   }
+ 
+   /**
+    * enable partitions on a participant
+    * @param participantId
+    * @param resourceId
+    * @param enablePartitionIdSet
+    * @return true if enabled successfully, false otherwise
+    */
+   public boolean enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+       Set<PartitionId> enablePartitionIdSet) {
+     return enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
+   }
+ 
+   /**
+    * Reset partitions assigned to a set of participants
+    * @param resetParticipantIdSet the participants to reset
+    * @return true if reset, false otherwise
+    */
+   public boolean resetParticipants(Set<ParticipantId> resetParticipantIdSet) {
+     List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+     for (ParticipantId participantId : resetParticipantIdSet) {
+       for (ExternalView extView : extViews) {
+         Set<PartitionId> resetPartitionIdSet = Sets.newHashSet();
+         for (PartitionId partitionId : extView.getPartitionIdSet()) {
+           Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+           if (stateMap.containsKey(participantId)
+               && stateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR))) {
+             resetPartitionIdSet.add(partitionId);
+           }
+         }
+         resetPartitionsForParticipant(participantId, extView.getResourceId(), resetPartitionIdSet);
+       }
+     }
+     return true;
+   }
+ 
+   /**
+    * reset partitions on a participant
+    * @param participantId
+    * @param resourceId
+    * @param resetPartitionIdSet
+    * @return true if partitions reset, false otherwise
+    */
+   public boolean resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+       Set<PartitionId> resetPartitionIdSet) {
+     // make sure the participant is running
+     Participant participant = readParticipant(participantId);
+     if (!participant.isAlive()) {
+       LOG.error("Cannot reset partitions because the participant is not running");
+       return false;
+     }
+     RunningInstance runningInstance = participant.getRunningInstance();
+ 
+     // check that the resource exists
+     ResourceAccessor resourceAccessor = resourceAccessor();
+     Resource resource = resourceAccessor.readResource(resourceId);
+     if (resource == null || resource.getRebalancerConfig() == null) {
+       LOG.error("Cannot reset partitions because the resource is not present");
+       return false;
+     }
+ 
+     // need the rebalancer context for the resource
+     RebalancerContext context =
+         resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+     if (context == null) {
+       LOG.error("Rebalancer context for resource does not exist");
+       return false;
+     }
+ 
+     // ensure that all partitions to reset exist
+     Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+     if (!partitionSet.containsAll(resetPartitionIdSet)) {
+       LOG.error("Not all of the specified partitions to reset exist for the resource");
+       return false;
+     }
+ 
+     // check for a valid current state that has all specified partitions in ERROR state
+     CurrentState currentState = participant.getCurrentStateMap().get(resourceId);
+     if (currentState == null) {
+       LOG.error("The participant does not have a current state for the resource");
+       return false;
+     }
+     for (PartitionId partitionId : resetPartitionIdSet) {
+       if (!currentState.getState(partitionId).equals(State.from(HelixDefinedState.ERROR))) {
+         LOG.error("Partition " + partitionId + " is not in error state, aborting reset");
+         return false;
+       }
+     }
+ 
+     // make sure that there are no pending transition messages
+     for (Message message : participant.getMessageMap().values()) {
+       if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
+           || !runningInstance.getSessionId().equals(message.getTypedTgtSessionId())
+           || !resourceId.equals(message.getResourceId())
+           || !resetPartitionIdSet.contains(message.getPartitionId())) {
+         continue;
+       }
+       LOG.error("Cannot reset partitions because of the following pending message: " + message);
+       return false;
+     }
+ 
+     // set up the source id
+     String adminName = null;
+     try {
+       adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+     } catch (UnknownHostException e) {
+       // can ignore it
+       if (LOG.isInfoEnabled()) {
+         LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+       }
+       adminName = "UNKNOWN";
+     }
+ 
+     // build messages to signal the transition
+     StateModelDefId stateModelDefId = context.getStateModelDefId();
+     StateModelDefinition stateModelDef =
+         _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+     Map<MessageId, Message> messageMap = Maps.newHashMap();
+     for (PartitionId partitionId : resetPartitionIdSet) {
+       // send ERROR to initialState message
+       MessageId msgId = MessageId.from(UUID.randomUUID().toString());
+       Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+       message.setSrcName(adminName);
+       message.setTgtName(participantId.stringify());
+       message.setMsgState(MessageState.NEW);
+       message.setPartitionId(partitionId);
+       message.setResourceId(resourceId);
+       message.setTgtSessionId(runningInstance.getSessionId());
+       message.setStateModelDef(stateModelDefId);
+       message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
+       message.setToState(stateModelDef.getTypedInitialState());
+       message.setStateModelFactoryId(context.getStateModelFactoryId());
+ 
+       messageMap.put(message.getMessageId(), message);
+     }
+ 
+     // send the messages
+     insertMessagesToParticipant(participantId, messageMap);
+     return true;
+   }
+ 
+   /**
+    * Read the user config of the participant
+    * @param participantId the participant to to look up
+    * @return UserConfig, or null
+    */
+   public UserConfig readUserConfig(ParticipantId participantId) {
+     InstanceConfig instanceConfig =
+         _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+     return instanceConfig != null ? instanceConfig.getUserConfig() : null;
+   }
+ 
+   /**
+    * Set the user config of the participant, overwriting existing user configs
+    * @param participantId the participant to update
+    * @param userConfig the new user config
+    * @return true if the user config was set, false otherwise
+    */
+   public boolean setUserConfig(ParticipantId participantId, UserConfig userConfig) {
+     ParticipantConfig.Delta delta =
+         new ParticipantConfig.Delta(participantId).setUserConfig(userConfig);
+     return updateParticipant(participantId, delta) != null;
+   }
+ 
+   /**
+    * Add user configuration to the existing participant user configuration. Overwrites properties
+    * with
+    * the same key
+    * @param participant the participant to update
+    * @param userConfig the user config key-value pairs to add
+    * @return true if the user config was updated, false otherwise
+    */
+   public boolean updateUserConfig(ParticipantId participantId, UserConfig userConfig) {
+     InstanceConfig instanceConfig = new InstanceConfig(participantId);
+     instanceConfig.addNamespacedConfig(userConfig);
+     return _accessor.updateProperty(_keyBuilder.instanceConfig(participantId.stringify()),
+         instanceConfig);
+   }
+ 
+   /**
+    * Clear any user-specified configuration from the participant
+    * @param participantId the participant to update
+    * @return true if the config was cleared, false otherwise
+    */
+   public boolean dropUserConfig(ParticipantId participantId) {
+     return setUserConfig(participantId, new UserConfig(Scope.participant(participantId)));
+   }
+ 
+   /**
+    * Update a participant configuration
+    * @param participantId the participant to update
+    * @param participantDelta changes to the participant
+    * @return ParticipantConfig, or null if participant is not persisted
+    */
+   public ParticipantConfig updateParticipant(ParticipantId participantId,
+       ParticipantConfig.Delta participantDelta) {
+     Participant participant = readParticipant(participantId);
+     if (participant == null) {
+       LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+       return null;
+     }
+     ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+     setParticipant(config);
+     return config;
+   }
+ 
+   /**
+    * Set the configuration of an existing participant
+    * @param participantConfig participant configuration
+    * @return true if config was set, false if there was an error
+    */
+   public boolean setParticipant(ParticipantConfig participantConfig) {
+     if (participantConfig == null) {
+       LOG.error("Participant config not initialized");
+       return false;
+     }
+     InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId());
+     instanceConfig.setHostName(participantConfig.getHostName());
+     instanceConfig.setPort(Integer.toString(participantConfig.getPort()));
+     for (String tag : participantConfig.getTags()) {
+       instanceConfig.addTag(tag);
+     }
+     for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
+       instanceConfig.setParticipantEnabledForPartition(partitionId, false);
+     }
+     instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
+     instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
+     _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()),
+         instanceConfig);
+     return true;
+   }
+ 
+   /**
+    * create a participant based on physical model
+    * @param participantId
+    * @param instanceConfig
+    * @param userConfig
+    * @param liveInstance
+    * @param instanceMsgMap map of message-id to message
+    * @param instanceCurStateMap map of resource-id to current-state
+    * @return participant
+    */
+   static Participant createParticipant(ParticipantId participantId, InstanceConfig instanceConfig,
+       UserConfig userConfig, LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
+       Map<String, CurrentState> instanceCurStateMap) {
+ 
+     String hostName = instanceConfig.getHostName();
+ 
+     int port = -1;
+     try {
+       port = Integer.parseInt(instanceConfig.getPort());
+     } catch (IllegalArgumentException e) {
+       // keep as -1
+     }
+     if (port < 0 || port > 65535) {
+       port = -1;
+     }
+     boolean isEnabled = instanceConfig.getInstanceEnabled();
+ 
+     List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
+     Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
+     if (disabledPartitions != null) {
+       disabledPartitionIdSet = new HashSet<PartitionId>();
+       for (String partitionId : disabledPartitions) {
+         disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
+             PartitionId.stripResourceId(partitionId)));
+       }
+     }
+ 
+     Set<String> tags = new HashSet<String>(instanceConfig.getTags());
+ 
+     RunningInstance runningInstance = null;
+     if (liveInstance != null) {
+       runningInstance =
+           new RunningInstance(liveInstance.getTypedSessionId(),
+               liveInstance.getTypedHelixVersion(), liveInstance.getProcessId());
+     }
+ 
+     Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+     if (instanceMsgMap != null) {
+       for (String msgId : instanceMsgMap.keySet()) {
+         Message message = instanceMsgMap.get(msgId);
+         msgMap.put(MessageId.from(msgId), message);
+       }
+     }
+ 
+     Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
+     if (instanceCurStateMap != null) {
+ 
+       for (String resourceName : instanceCurStateMap.keySet()) {
+         curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName));
+       }
+     }
+ 
+     return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+         runningInstance, curStateMap, msgMap, userConfig);
+   }
+ 
+   /**
+    * read participant related data
+    * @param participantId
+    * @return participant, or null if participant not available
+    */
+   public Participant readParticipant(ParticipantId participantId) {
+     // read physical model
+     String participantName = participantId.stringify();
+     InstanceConfig instanceConfig =
+         _accessor.getProperty(_keyBuilder.instanceConfig(participantName));
+ 
+     if (instanceConfig == null) {
+       LOG.error("Participant " + participantId + " is not present on the cluster");
+       return null;
+     }
+ 
+     UserConfig userConfig = instanceConfig.getUserConfig();
+     LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
+ 
+     Map<String, Message> instanceMsgMap = Collections.emptyMap();
+     Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
+     if (liveInstance != null) {
+       SessionId sessionId = liveInstance.getTypedSessionId();
+ 
+       instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+       instanceCurStateMap =
+           _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+               sessionId.stringify()));
+     }
+ 
+     return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+         instanceMsgMap, instanceCurStateMap);
+   }
+ 
+   /**
+    * update resource current state of a participant
+    * @param resourceId resource id
+    * @param participantId participant id
+    * @param sessionId session id
+    * @param curStateUpdate current state change delta
+    */
+   public void updateCurrentState(ResourceId resourceId, ParticipantId participantId,
+       SessionId sessionId, CurrentState curStateUpdate) {
+     _accessor.updateProperty(
+         _keyBuilder.currentState(participantId.stringify(), sessionId.stringify(),
+             resourceId.stringify()), curStateUpdate);
+   }
+ 
+   /**
+    * drop resource current state of a participant
+    * @param resourceId resource id
+    * @param participantId participant id
+    * @param sessionId session id
+    * @return true if dropped, false otherwise
+    */
+   public boolean dropCurrentState(ResourceId resourceId, ParticipantId participantId,
+       SessionId sessionId) {
+     return _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+         sessionId.stringify(), resourceId.stringify()));
+   }
+ 
+   /**
+    * drop a participant from cluster
+    * @param participantId
+    * @return true if participant dropped, false if there was an error
+    */
+   boolean dropParticipant(ParticipantId participantId) {
+     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+     }
+ 
+     if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+       LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
+     }
+ 
+     // delete participant config path
+     _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+ 
+     // delete participant path
+     _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+     return true;
+   }
+ 
+   /**
+    * Let a new participant take the place of an existing participant
+    * @param oldParticipantId the participant to drop
+    * @param newParticipantId the participant that takes its place
+    * @return true if swap successful, false otherwise
+    */
+   public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+     Participant oldParticipant = readParticipant(oldParticipantId);
+     if (oldParticipant == null) {
+       LOG.error("Could not swap participants because the old participant does not exist");
+       return false;
+     }
+     if (oldParticipant.isEnabled()) {
+       LOG.error("Could not swap participants because the old participant is still enabled");
+       return false;
+     }
+     if (oldParticipant.isAlive()) {
+       LOG.error("Could not swap participants because the old participant is still live");
+       return false;
+     }
+     Participant newParticipant = readParticipant(newParticipantId);
+     if (newParticipant == null) {
+       LOG.error("Could not swap participants because the new participant does not exist");
+       return false;
+     }
+     dropParticipant(oldParticipantId);
+     ResourceAccessor resourceAccessor = resourceAccessor();
+     Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+     for (String resourceName : idealStateMap.keySet()) {
+       IdealState idealState = idealStateMap.get(resourceName);
+       swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
+       PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
+       resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
 -      _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
++      _accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
+     }
+     return true;
+   }
+ 
+   /**
+    * Replace occurrences of participants in preference lists and maps
+    * @param idealState the current ideal state
+    * @param oldParticipantId the participant to drop
+    * @param newParticipantId the participant that replaces it
+    */
+   protected void swapParticipantsInIdealState(IdealState idealState,
+       ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+     for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+       List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
+       if (oldPreferenceList != null) {
+         List<ParticipantId> newPreferenceList = Lists.newArrayList();
+         for (ParticipantId participantId : oldPreferenceList) {
+           if (participantId.equals(oldParticipantId)) {
+             newPreferenceList.add(newParticipantId);
+           } else if (!participantId.equals(newParticipantId)) {
+             newPreferenceList.add(participantId);
+           }
+         }
+         idealState.setPreferenceList(partitionId, newPreferenceList);
+       }
+       Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+       if (preferenceMap != null) {
+         if (preferenceMap.containsKey(oldParticipantId)) {
+           State state = preferenceMap.get(oldParticipantId);
+           preferenceMap.remove(oldParticipantId);
+           preferenceMap.put(newParticipantId, state);
+         }
+         idealState.setParticipantStateMap(partitionId, preferenceMap);
+       }
+     }
+   }
+ 
+   /**
+    * Create empty persistent properties to ensure that there is a valid participant structure
+    */
+   public void initParticipantStructure(ParticipantId participantId) {
+     List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+     for (String path : paths) {
+       boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+       if (!status && LOG.isDebugEnabled()) {
+         LOG.debug(path + " already exists");
+       }
+     }
+   }
+ 
+   /**
+    * Clear properties for the participant
+    */
+   void clearParticipantStructure(ParticipantId participantId) {
+     List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+     baseAccessor.remove(paths, 0);
+   }
+ 
+   /**
+    * check if participant structure is valid
+    * @return true if valid or false otherwise
+    */
+   public boolean isParticipantStructureValid(ParticipantId participantId) {
+     List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+     if (baseAccessor != null) {
+       boolean[] existsResults = baseAccessor.exists(paths, 0);
+       for (boolean exists : existsResults) {
+         if (!exists) {
+           return false;
+         }
+       }
+     }
+     return true;
+   }
+ 
+   /**
+    * Get the paths that should be created if the participant exists
+    * @param keyBuilder PropertyKey.Builder for the cluster
+    * @param participantId the participant for which to generate paths
+    * @return list of required paths as strings
+    */
+   private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder,
+       ParticipantId participantId) {
+     List<String> paths = Lists.newArrayList();
+     paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
+     paths.add(keyBuilder.messages(participantId.stringify()).getPath());
+     paths.add(keyBuilder.currentStates(participantId.stringify()).getPath());
+     paths.add(keyBuilder.participantErrors(participantId.stringify()).getPath());
+     paths.add(keyBuilder.statusUpdates(participantId.stringify()).getPath());
+     return paths;
+   }
+ 
+   /**
+    * Get a ResourceAccessor instance
+    * @return ResourceAccessor
+    */
+   protected ResourceAccessor resourceAccessor() {
+     return new ResourceAccessor(_accessor);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 0000000,f24b5b1..0dfceca
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@@ -1,0 -1,470 +1,470 @@@
+ package org.apache.helix.api.accessor;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.helix.HelixConstants.StateModelToken;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixDefinedState;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.Scope;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.config.ResourceConfig.ResourceType;
+ import org.apache.helix.api.config.UserConfig;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+ import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.IdealState.RebalanceMode;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.ResourceAssignment;
+ import org.apache.helix.model.ResourceConfiguration;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+ 
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+ 
+ public class ResourceAccessor {
+   private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
+   private final HelixDataAccessor _accessor;
+   private final PropertyKey.Builder _keyBuilder;
+ 
+   public ResourceAccessor(HelixDataAccessor accessor) {
+     _accessor = accessor;
+     _keyBuilder = accessor.keyBuilder();
+   }
+ 
+   /**
+    * Read a single snapshot of a resource
+    * @param resourceId the resource id to read
+    * @return Resource or null if not present
+    */
+   public Resource readResource(ResourceId resourceId) {
+     ResourceConfiguration config =
+         _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
 -    IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
++    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+ 
+     if (config == null && idealState == null) {
+       LOG.error("Resource " + resourceId + " not present on the cluster");
+       return null;
+     }
+ 
+     ExternalView externalView =
+         _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+     ResourceAssignment resourceAssignment =
+         _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+     return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+   }
+ 
+   /**
+    * Update a resource configuration
+    * @param resourceId the resource id to update
+    * @param resourceDelta changes to the resource
+    * @return ResourceConfig, or null if the resource is not persisted
+    */
+   public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+     Resource resource = readResource(resourceId);
+     if (resource == null) {
+       LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+       return null;
+     }
+     ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+     setResource(config);
+     return config;
+   }
+ 
+   /**
+    * save resource assignment
+    * @param resourceId
+    * @param resourceAssignment
+    * @return true if set, false otherwise
+    */
+   public boolean setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+     return _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+         resourceAssignment);
+   }
+ 
+   /**
+    * get resource assignment
+    * @param resourceId
+    * @return resource assignment or null
+    */
+   public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+     return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+   }
+ 
+   /**
+    * Set a physical resource configuration, which may include user-defined configuration, as well as
+    * rebalancer configuration
+    * @param resourceId
+    * @param configuration
+    * @return true if set, false otherwise
+    */
+   private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+     boolean status =
+         _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+     // also set an ideal state if the resource supports it
+     RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+     IdealState idealState =
+         rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
+             configuration.getBatchMessageMode());
+     if (idealState != null) {
 -      _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
++      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+     }
+     return status;
+   }
+ 
+   /**
+    * Set the context of the rebalancer. This includes all properties required for rebalancing this
+    * resource
+    * @param resourceId the resource to update
+    * @param context the new rebalancer context
+    * @return true if the context was set, false otherwise
+    */
+   public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+     RebalancerConfig config = new RebalancerConfig(context);
+     ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
+     resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+ 
+     // update the ideal state if applicable
+     IdealState oldIdealState =
 -        _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
++        _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+     if (oldIdealState != null) {
+       IdealState idealState =
+           rebalancerConfigToIdealState(config, oldIdealState.getBucketSize(),
+               oldIdealState.getBatchMessageMode());
+       if (idealState != null) {
 -        _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
++        _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+       }
+     }
+ 
+     return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
+         resourceConfig);
+   }
+ 
+   /**
+    * Read the user config of the resource
+    * @param resourceId the resource to to look up
+    * @return UserConfig, or null
+    */
+   public UserConfig readUserConfig(ResourceId resourceId) {
+     ResourceConfiguration resourceConfig =
+         _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+     return resourceConfig != null ? UserConfig.from(resourceConfig) : null;
+   }
+ 
+   /**
+    * Read the rebalancer config of the resource
+    * @param resourceId the resource to to look up
+    * @return RebalancerConfig, or null
+    */
+   public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
+     ResourceConfiguration resourceConfig =
+         _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+     return resourceConfig != null ? RebalancerConfig.from(resourceConfig) : null;
+   }
+ 
+   /**
+    * Set the user config of the resource, overwriting existing user configs
+    * @param resourceId the resource to update
+    * @param userConfig the new user config
+    * @return true if the user config was set, false otherwise
+    */
+   public boolean setUserConfig(ResourceId resourceId, UserConfig userConfig) {
+     ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId).setUserConfig(userConfig);
+     return updateResource(resourceId, delta) != null;
+   }
+ 
+   /**
+    * Add user configuration to the existing resource user configuration. Overwrites properties with
+    * the same key
+    * @param resourceId the resource to update
+    * @param userConfig the user config key-value pairs to add
+    * @return true if the user config was updated, false otherwise
+    */
+   public boolean updateUserConfig(ResourceId resourceId, UserConfig userConfig) {
+     ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
+     resourceConfig.addNamespacedConfig(userConfig);
+     return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
+         resourceConfig);
+   }
+ 
+   /**
+    * Clear any user-specified configuration from the resource
+    * @param resourceId the resource to update
+    * @return true if the config was cleared, false otherwise
+    */
+   public boolean dropUserConfig(ResourceId resourceId) {
+     return setUserConfig(resourceId, new UserConfig(Scope.resource(resourceId)));
+   }
+ 
+   /**
+    * Persist an existing resource's logical configuration
+    * @param resourceConfig logical resource configuration
+    * @return true if resource is set, false otherwise
+    */
+   public boolean setResource(ResourceConfig resourceConfig) {
+     if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) {
+       LOG.error("Resource not fully defined with a rebalancer context");
+       return false;
+     }
+     ResourceId resourceId = resourceConfig.getId();
+     ResourceConfiguration config = new ResourceConfiguration(resourceId);
+     config.addNamespacedConfig(resourceConfig.getUserConfig());
+     config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+     config.setBucketSize(resourceConfig.getBucketSize());
+     config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+     setConfiguration(resourceId, config);
+     return true;
+   }
+ 
+   /**
+    * Get a resource configuration, which may include user-defined configuration, as well as
+    * rebalancer configuration
+    * @param resourceId
+    * @return configuration or null
+    */
+   public ResourceConfiguration getConfiguration(ResourceId resourceId) {
+     return _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+   }
+ 
+   /**
+    * set external view of a resource
+    * @param resourceId
+    * @param extView
+    * @return true if set, false otherwise
+    */
+   public boolean setExternalView(ResourceId resourceId, ExternalView extView) {
+     return _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
+   }
+ 
+   /**
+    * get the external view of a resource
+    * @param resourceId the resource to look up
+    * @return external view or null
+    */
+   public ExternalView readExternalView(ResourceId resourceId) {
+     return _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+   }
+ 
+   /**
+    * drop external view of a resource
+    * @param resourceId
+    * @return true if dropped, false otherwise
+    */
+   public boolean dropExternalView(ResourceId resourceId) {
+     return _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
+   }
+ 
+   /**
+    * reset resources for all participants
+    * @param resetResourceIdSet the resources to reset
+    * @return true if they were reset, false otherwise
+    */
+   public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
+     ParticipantAccessor accessor = participantAccessor();
+     List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+     for (ExternalView extView : extViews) {
+       if (!resetResourceIdSet.contains(extView.getResourceId())) {
+         continue;
+       }
+ 
+       Map<ParticipantId, Set<PartitionId>> resetPartitionIds = Maps.newHashMap();
+       for (PartitionId partitionId : extView.getPartitionIdSet()) {
+         Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+         for (ParticipantId participantId : stateMap.keySet()) {
+           State state = stateMap.get(participantId);
+           if (state.equals(State.from(HelixDefinedState.ERROR))) {
+             if (!resetPartitionIds.containsKey(participantId)) {
+               resetPartitionIds.put(participantId, new HashSet<PartitionId>());
+             }
+             resetPartitionIds.get(participantId).add(partitionId);
+           }
+         }
+       }
+       for (ParticipantId participantId : resetPartitionIds.keySet()) {
+         accessor.resetPartitionsForParticipant(participantId, extView.getResourceId(),
+             resetPartitionIds.get(participantId));
+       }
+     }
+     return true;
+   }
+ 
+   /**
+    * Generate a default assignment for partitioned resources
+    * @param resourceId the resource to update
+    * @param replicaCount the new replica count (or -1 to use the existing one)
+    * @param participantGroupTag the new participant group tag (or null to use the existing one)
+    * @return true if assignment successful, false otherwise
+    */
+   public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+       String participantGroupTag) {
+     Resource resource = readResource(resourceId);
+     RebalancerConfig config = resource.getRebalancerConfig();
+     PartitionedRebalancerContext context =
+         config.getRebalancerContext(PartitionedRebalancerContext.class);
+     if (context == null) {
+       LOG.error("Only partitioned resource types are supported");
+       return false;
+     }
+     if (replicaCount != -1) {
+       context.setReplicaCount(replicaCount);
+     }
+     if (participantGroupTag != null) {
+       context.setParticipantGroupTag(participantGroupTag);
+     }
+     StateModelDefinition stateModelDef =
+         _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+     List<InstanceConfig> participantConfigs =
+         _accessor.getChildValues(_keyBuilder.instanceConfigs());
+     Set<ParticipantId> participantSet = Sets.newHashSet();
+     for (InstanceConfig participantConfig : participantConfigs) {
+       participantSet.add(participantConfig.getParticipantId());
+     }
+     context.generateDefaultConfiguration(stateModelDef, participantSet);
+     setRebalancerContext(resourceId, context);
+     return true;
+   }
+ 
+   /**
+    * Get an ideal state from a rebalancer config if the resource is partitioned
+    * @param config RebalancerConfig instance
+    * @param bucketSize bucket size to use
+    * @param batchMessageMode true if batch messaging allowed, false otherwise
+    * @return IdealState, or null
+    */
+   static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+       boolean batchMessageMode) {
+     PartitionedRebalancerContext partitionedContext =
+         config.getRebalancerContext(PartitionedRebalancerContext.class);
+     if (partitionedContext != null) {
+       IdealState idealState = new IdealState(partitionedContext.getResourceId());
+       idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
+       idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+       String replicas = null;
+       if (partitionedContext.anyLiveParticipant()) {
+         replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
+       } else {
+         replicas = Integer.toString(partitionedContext.getReplicaCount());
+       }
+       idealState.setReplicas(replicas);
+       idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
+       idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
+       idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
+       idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
+       idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+       idealState.setBucketSize(bucketSize);
+       idealState.setBatchMessageMode(batchMessageMode);
+       if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+         SemiAutoRebalancerContext semiAutoContext =
+             config.getRebalancerContext(SemiAutoRebalancerContext.class);
+         for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
+           idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+         }
+       } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+         CustomRebalancerContext customContext =
+             config.getRebalancerContext(CustomRebalancerContext.class);
+         for (PartitionId partitionId : customContext.getPartitionSet()) {
+           idealState.setParticipantStateMap(partitionId,
+               customContext.getPreferenceMap(partitionId));
+         }
+       } else {
+         for (PartitionId partitionId : partitionedContext.getPartitionSet()) {
+           List<ParticipantId> preferenceList = Collections.emptyList();
+           idealState.setPreferenceList(partitionId, preferenceList);
+           Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
+           idealState.setParticipantStateMap(partitionId, participantStateMap);
+         }
+       }
+       return idealState;
+     }
+     return null;
+   }
+ 
+   /**
+    * Create a resource snapshot instance from the physical model
+    * @param resourceId the resource id
+    * @param resourceConfiguration physical resource configuration
+    * @param idealState ideal state of the resource
+    * @param externalView external view of the resource
+    * @param resourceAssignment current resource assignment
+    * @return Resource
+    */
+   static Resource createResource(ResourceId resourceId,
+       ResourceConfiguration resourceConfiguration, IdealState idealState,
+       ExternalView externalView, ResourceAssignment resourceAssignment) {
+     UserConfig userConfig;
+     RebalancerContext rebalancerContext = null;
+     ResourceType type = ResourceType.DATA;
+     if (resourceConfiguration != null) {
+       userConfig = resourceConfiguration.getUserConfig();
+       type = resourceConfiguration.getType();
+     } else {
+       userConfig = new UserConfig(Scope.resource(resourceId));
+     }
+     int bucketSize = 0;
+     boolean batchMessageMode = false;
+     if (idealState != null) {
+       if (resourceConfiguration != null) {
+         rebalancerContext =
+             resourceConfiguration.getRebalancerContext(PartitionedRebalancerContext.class);
+       }
+       if (rebalancerContext == null) {
+         // fallback: get rebalancer context from ideal state
+         rebalancerContext = PartitionedRebalancerContext.from(idealState);
+       }
+       bucketSize = idealState.getBucketSize();
+       batchMessageMode = idealState.getBatchMessageMode();
+       idealState.updateUserConfig(userConfig);
+     } else if (resourceConfiguration != null) {
+       bucketSize = resourceConfiguration.getBucketSize();
+       batchMessageMode = resourceConfiguration.getBatchMessageMode();
+       RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
+       rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+     }
+     if (rebalancerContext == null) {
+       rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
+     }
+     return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
+         rebalancerContext, userConfig, bucketSize, batchMessageMode);
+   }
+ 
+   /**
+    * Get a ParticipantAccessor instance
+    * @return ParticipantAccessor
+    */
+   protected ParticipantAccessor participantAccessor() {
+     return new ParticipantAccessor(_accessor);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 0000000,d0a96cc..6d7b0ef
mode 000000,100644..100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@@ -1,0 -1,196 +1,211 @@@
+ package org.apache.helix.controller.rebalancer;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+ import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+ import org.apache.helix.controller.stages.ResourceCurrentState;
+ import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+ import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+ import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+ import org.apache.helix.model.ResourceAssignment;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.log4j.Logger;
+ 
+ import com.google.common.base.Function;
+ import com.google.common.collect.Lists;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ public class FullAutoRebalancer implements HelixRebalancer {
+   // These should be final, but are initialized in init rather than a constructor
+   private AutoRebalanceStrategy _algorithm;
+ 
+   private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
+ 
+   @Override
+   public void init(HelixManager helixManager) {
+     // do nothing
+   }
+ 
+   @Override
+   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+       Cluster cluster, ResourceCurrentState currentState) {
+     FullAutoRebalancerContext config =
+         rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+     StateModelDefinition stateModelDef =
+         cluster.getStateModelMap().get(config.getStateModelDefId());
+     // Compute a preference list based on the current ideal state
+     List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+     Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+     int replicas = -1;
+     if (config.anyLiveParticipant()) {
+       replicas = liveParticipants.size();
+     } else {
+       replicas = config.getReplicaCount();
+     }
+ 
+     // count how many replicas should be in each state
+     Map<State, String> upperBounds =
+         ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+             cluster.getConfig());
+     LinkedHashMap<State, Integer> stateCountMap =
 -        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
 -            liveParticipants.size(), replicas);
++        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, liveParticipants.size(),
++            replicas);
+ 
+     // get the participant lists
+     List<ParticipantId> liveParticipantList =
+         new ArrayList<ParticipantId>(liveParticipants.keySet());
+     List<ParticipantId> allParticipantList =
+         new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+ 
+     // compute the current mapping from the current state
+     Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+         currentMapping(config, currentState, stateCountMap);
+ 
+     // If there are nodes tagged with resource, use only those nodes
++    // If there are nodes tagged with resource name, use only those nodes
+     Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
++    Set<ParticipantId> taggedLiveNodes = new HashSet<ParticipantId>();
+     if (config.getParticipantGroupTag() != null) {
 -      for (ParticipantId participantId : liveParticipantList) {
 -        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
++      for (ParticipantId participantId : allParticipantList) {
++        if (cluster.getParticipantMap().get(participantId).hasTag(config.getParticipantGroupTag())) {
+           taggedNodes.add(participantId);
++          if (liveParticipants.containsKey(participantId)) {
++            taggedLiveNodes.add(participantId);
++          }
+         }
+       }
 -    }
 -    if (taggedNodes.size() > 0) {
 -      if (LOG.isInfoEnabled()) {
 -        LOG.info("found the following instances with tag " + config.getResourceId() + " "
 -            + taggedNodes);
++      if (!taggedLiveNodes.isEmpty()) {
++        // live nodes exist that have this tag
++        if (LOG.isInfoEnabled()) {
++          LOG.info("found the following participants with tag " + config.getParticipantGroupTag()
++              + " for " + config.getResourceId() + ": " + taggedLiveNodes);
++        }
++      } else if (taggedNodes.isEmpty()) {
++        // no live nodes and no configured nodes have this tag
++        LOG.warn("Resource " + config.getResourceId() + " has tag "
++            + config.getParticipantGroupTag() + " but no configured participants have this tag");
++      } else {
++        // configured nodes have this tag, but no live nodes have this tag
++        LOG.warn("Resource " + config.getResourceId() + " has tag "
++            + config.getParticipantGroupTag() + " but no live participants have this tag");
+       }
 -      liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
++      allParticipantList = new ArrayList<ParticipantId>(taggedNodes);
++      liveParticipantList = new ArrayList<ParticipantId>(taggedLiveNodes);
+     }
+ 
+     // determine which nodes the replicas should live on
+     int maxPartition = config.getMaxPartitionsPerParticipant();
+     if (LOG.isInfoEnabled()) {
+       LOG.info("currentMapping: " + currentMapping);
+       LOG.info("stateCountMap: " + stateCountMap);
+       LOG.info("liveNodes: " + liveParticipantList);
+       LOG.info("allNodes: " + allParticipantList);
+       LOG.info("maxPartition: " + maxPartition);
+     }
+     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+     _algorithm =
+         new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+             placementScheme);
+     ZNRecord newMapping =
+         _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+             allParticipantList);
+ 
+     if (LOG.isInfoEnabled()) {
+       LOG.info("newMapping: " + newMapping);
+     }
+ 
+     // compute a full partition mapping for the resource
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("Processing resource:" + config.getResourceId());
+     }
+     ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+     for (PartitionId partition : partitions) {
+       Set<ParticipantId> disabledParticipantsForPartition =
+           ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+       List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+       if (rawPreferenceList == null) {
+         rawPreferenceList = Collections.emptyList();
+       }
+       List<ParticipantId> preferenceList =
+           Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+             @Override
+             public ParticipantId apply(String participantName) {
+               return ParticipantId.from(participantName);
+             }
+           });
+       preferenceList =
+           ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
+       Map<ParticipantId, State> bestStateForPartition =
+           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+               liveParticipants.keySet(), stateModelDef, preferenceList,
+               currentState.getCurrentStateMap(config.getResourceId(), partition),
+               disabledParticipantsForPartition);
+       partitionMapping.addReplicaMap(partition, bestStateForPartition);
+     }
+     return partitionMapping;
+   }
+ 
+   private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+       FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+       Map<State, Integer> stateCountMap) {
+     Map<PartitionId, Map<ParticipantId, State>> map =
+         new HashMap<PartitionId, Map<ParticipantId, State>>();
+ 
+     for (PartitionId partition : config.getPartitionSet()) {
+       Map<ParticipantId, State> curStateMap =
+           currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+       map.put(partition, new HashMap<ParticipantId, State>());
+       for (ParticipantId node : curStateMap.keySet()) {
+         State state = curStateMap.get(node);
+         if (stateCountMap.containsKey(state)) {
+           map.get(partition).put(node, state);
+         }
+       }
+ 
+       Map<ParticipantId, State> pendingStateMap =
+           currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+       for (ParticipantId node : pendingStateMap.keySet()) {
+         State state = pendingStateMap.get(node);
+         if (stateCountMap.containsKey(state)) {
+           map.get(partition).put(node, state);
+         }
+       }
+     }
+     return map;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
index 33593ae,d489378..3877686
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
@@@ -37,41 -35,55 +37,43 @@@ import org.restlet.resource.ServerResou
   * REST resource for ZkPropertyTransfer server to receive PUT requests
   * that submits ZNRecordUpdates
   */
 -public class ZNRecordUpdateResource extends Resource {
 +public class ZNRecordUpdateResource extends ServerResource {
-     public static final String UPDATEKEY = "ZNRecordUpdate";
-     private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
+   public static final String UPDATEKEY = "ZNRecordUpdate";
+   private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
  
-     public ZNRecordUpdateResource() { 
-         getVariants().add(new Variant(MediaType.TEXT_PLAIN));
-         getVariants().add(new Variant(MediaType.APPLICATION_JSON));
-         setNegotiated(false);
-     }
-     
-     @Override
-     public Representation put(Representation entity) {
-         try {
-             ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
 -  @Override
 -  public boolean allowGet() {
 -    return false;
 -  }
 -
 -  @Override
 -  public boolean allowPost() {
 -    return false;
++  public ZNRecordUpdateResource() {
++    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
++    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
++    setNegotiated(false);
+   }
+ 
+   @Override
 -  public boolean allowPut() {
 -    return true;
 -  }
 -
 -  @Override
 -  public boolean allowDelete() {
 -    return false;
 -  }
 -
 -  @Override
 -  public void storeRepresentation(Representation entity) {
++  public Representation put(Representation entity) {
+     try {
+       ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
  
-             Form form = new Form(entity);
-             String jsonPayload = form.getFirstValue(UPDATEKEY, true);
+       Form form = new Form(entity);
+       String jsonPayload = form.getFirstValue(UPDATEKEY, true);
  
-             // Parse the map from zkPath --> ZNRecordUpdate from the payload
-             StringReader sr = new StringReader(jsonPayload);
-             ObjectMapper mapper = new ObjectMapper();
-             TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef = new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
-             };
-             Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
-             // Enqueue the ZNRecordUpdate for sending
-             for (ZNRecordUpdate holder : holderMap.values()) {
-                 server.enqueueData(holder);
-                 LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
-             }
-             getResponse().setStatus(Status.SUCCESS_OK);
-         } catch (Exception e) {
-             LOG.error("", e);
-             getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
-         }
-         return null;
+       // Parse the map from zkPath --> ZNRecordUpdate from the payload
+       StringReader sr = new StringReader(jsonPayload);
+       ObjectMapper mapper = new ObjectMapper();
+       TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
+           new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
+           };
+       Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
+       // Enqueue the ZNRecordUpdate for sending
+       for (ZNRecordUpdate holder : holderMap.values()) {
+         server.enqueueData(holder);
+         LOG.info("Received " + holder.getPath() + " from "
+             + getRequest().getClientInfo().getAddress());
+       }
+       getResponse().setStatus(Status.SUCCESS_OK);
+     } catch (Exception e) {
+       LOG.error("", e);
+       getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
      }
++    return null;
+   }
 +
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 11955f5,051a2f3..933bf78
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@@ -63,68 -70,73 +70,72 @@@ public class BestPossibleStateCalcStag
      event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
  
      long endTime = System.currentTimeMillis();
-     logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+     if (LOG.isInfoEnabled()) {
+       LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+     }
    }
  
-   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
-       CurrentStateOutput currentStateOutput) {
-     // for each ideal state
-     // read the state model def
-     // for each resource
-     // get the preference list
-     // for each instanceName check if its alive then assign a state
-     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+   /**
+    * Fallback for cases when the resource has been dropped, but current state exists
+    * @param cluster cluster snapshot
+    * @param resourceId the resource for which to generate an assignment
+    * @param currentStateOutput full snapshot of the current state
+    * @param stateModelDef state model the resource follows
+    * @return assignment for the dropped resource
+    */
+   private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+       ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
+     ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+     Set<? extends PartitionId> mappedPartitions =
+         currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+     if (mappedPartitions == null) {
+       return partitionMapping;
+     }
+     for (PartitionId partitionId : mappedPartitions) {
+       Set<ParticipantId> disabledParticipantsForPartition =
+           ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+               partitionId);
+       Map<State, String> upperBounds =
 -          ConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
 -              cluster.getConfig());
++          ConstraintBasedAssignment
++              .stateConstraints(stateModelDef, resourceId, cluster.getConfig());
+       partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
+           .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+               stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+               disabledParticipantsForPartition));
+     }
+     return partitionMapping;
+   }
  
+   private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+       Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
      BestPossibleStateOutput output = new BestPossibleStateOutput();
+     Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
  
-     for (String resourceName : resourceMap.keySet()) {
-       logger.debug("Processing resource:" + resourceName);
- 
-       Resource resource = resourceMap.get(resourceName);
-       // Ideal state may be gone. In that case we need to get the state model name
-       // from the current state
-       IdealState idealState = cache.getIdealState(resourceName);
- 
-       if (idealState == null) {
-         // if ideal state is deleted, use an empty one
-         logger.info("resource:" + resourceName + " does not exist anymore");
-         idealState = new IdealState(resourceName);
+     for (ResourceId resourceId : resourceMap.keySet()) {
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("Processing resource:" + resourceId);
        }
- 
-       Rebalancer rebalancer = null;
-       if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-           && idealState.getRebalancerClassName() != null) {
-         String rebalancerClassName = idealState.getRebalancerClassName();
-         logger
-             .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
-         try {
-           rebalancer =
-               (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-         } catch (Exception e) {
-           logger.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
-         }
-       }
-       if (rebalancer == null) {
-         if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
-           rebalancer = new AutoRebalancer();
-         } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-           rebalancer = new SemiAutoRebalancer();
-         } else {
-           rebalancer = new CustomRebalancer();
+       ResourceConfig resourceConfig = resourceMap.get(resourceId);
+       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+       ResourceAssignment resourceAssignment = null;
+       if (rebalancerConfig != null) {
+         HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+         if (rebalancer != null) {
+           HelixManager manager = event.getAttribute("helixmanager");
+           rebalancer.init(manager);
+           resourceAssignment =
+               rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
          }
        }
- 
-       HelixManager manager = event.getAttribute("helixmanager");
-       rebalancer.init(manager);
-       ResourceAssignment partitionStateAssignment =
-           rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
-       if (partitionStateAssignment != null) {
-         for (Partition partition : resource.getPartitions()) {
-           Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
-           output.setState(resourceName, partition, newStateMap);
-         }
+       if (resourceAssignment == null) {
+         RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+         StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+         resourceAssignment =
+             mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
        }
 -
+       output.setResourceAssignment(resourceId, resourceAssignment);
      }
+ 
      return output;
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 35ef177,55a5e54..edceed6
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@@ -260,7 -274,7 +274,7 @@@ public class ExternalViewComputeStage e
  
        // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
        keyBuilder = accessor.keyBuilder();
-       accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta);
 -      accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
++      accessor.updateProperty(keyBuilder.idealStates(resourceId.stringify()), delta);
      }
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
index 359a959,2069974..0000000
deleted file mode 100644,100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
+++ /dev/null
@@@ -1,65 -1,66 +1,0 @@@
--package org.apache.helix.controller.stages;
--
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *   http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--
--import java.util.ArrayList;
--import java.util.Collections;
--import java.util.HashMap;
--import java.util.List;
--import java.util.Map;
--
--import org.apache.helix.model.Message;
--import org.apache.helix.model.Partition;
--
- public class MessageGenerationOutput {
- 
-   private final Map<String, Map<Partition, List<Message>>> _messagesMap;
- 
-   public MessageGenerationOutput() {
-     _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
- 
-   }
- 
-   public void addMessage(String resourceName, Partition partition, Message message) {
-     if (!_messagesMap.containsKey(resourceName)) {
-       _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
-     }
-     if (!_messagesMap.get(resourceName).containsKey(partition)) {
-       _messagesMap.get(resourceName).put(partition, new ArrayList<Message>());
- 
-     }
-     _messagesMap.get(resourceName).get(partition).add(message);
- 
-   }
- 
-   public List<Message> getMessages(String resourceName, Partition resource) {
-     Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-     if (map != null) {
-       return map.get(resource);
-     }
-     return Collections.emptyList();
- 
-   }
- 
-   @Override
-   public String toString() {
-     return _messagesMap.toString();
-   }
- }
 -@Deprecated
 -public class MessageGenerationOutput {
 -
 -  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
 -
 -  public MessageGenerationOutput() {
 -    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
 -
 -  }
 -
 -  public void addMessage(String resourceName, Partition partition, Message message) {
 -    if (!_messagesMap.containsKey(resourceName)) {
 -      _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
 -    }
 -    if (!_messagesMap.get(resourceName).containsKey(partition)) {
 -      _messagesMap.get(resourceName).put(partition, new ArrayList<Message>());
 -
 -    }
 -    _messagesMap.get(resourceName).get(partition).add(message);
 -
 -  }
 -
 -  public List<Message> getMessages(String resourceName, Partition resource) {
 -    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
 -    if (map != null) {
 -      return map.get(resource);
 -    }
 -    return Collections.emptyList();
 -
 -  }
 -
 -  @Override
 -  public String toString() {
 -    return _messagesMap.toString();
 -  }
 -}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 99668d3,59bae9f..6e30074
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@@ -906,7 -917,7 +917,7 @@@ public class ZKHelixAdmin implements He
          new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
      Builder keyBuilder = accessor.keyBuilder();
  
-     accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
 -    accessor.setProperty(keyBuilder.idealState(idealState.getResourceId().stringify()), idealState);
++    accessor.setProperty(keyBuilder.idealStates(idealState.getResourceId().stringify()), idealState);
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 94e8feb,6fbe690..471530c
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@@ -76,18 -76,7 +76,18 @@@ public class ZKHelixDataAccessor implem
      PropertyType type = key.getType();
      String path = key.getPath();
      int options = constructOptions(type);
 -    return _baseDataAccessor.create(path, value == null ? null : value.getRecord(), options);
 +    boolean success = false;
 +    switch (type) {
 +    case STATEMODELDEFS:
-       if (value.isValid()) {
++      if (value != null && value.isValid()) {
 +        success = _baseDataAccessor.create(path, value.getRecord(), options);
 +      }
 +      break;
 +    default:
-       success = _baseDataAccessor.create(path, value.getRecord(), options);
++      success = _baseDataAccessor.create(path, value == null ? null : value.getRecord(), options);
 +      break;
 +    }
 +    return success;
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index e99e173,2e759e6..a74d217
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@@ -30,9 -30,13 +30,13 @@@ import java.util.TreeMap
  import org.apache.helix.HelixDefinedState;
  import org.apache.helix.HelixProperty;
  import org.apache.helix.ZNRecord;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.id.StateModelDefId;
  import org.apache.helix.model.builder.StateTransitionTableBuilder;
 -import org.apache.log4j.Logger;
 +import org.apache.helix.model.util.StateModelDefinitionValidator;
  
+ import com.google.common.collect.ImmutableList;
+ 
  /**
   * Describe the state model
   */
@@@ -185,9 -248,27 +247,18 @@@ public class StateModelDefinition exten
      return _statesCountMap.get(state);
    }
  
+   /**
+    * Number of participants that can be in each state
+    * @param state the state
+    * @return maximum instance count per state, can be "N" or "R"
+    */
+   public String getNumParticipantsPerState(State state) {
+     return _statesCountMap.get(state.toString());
+   }
+ 
    @Override
    public boolean isValid() {
 -    if (getInitialState() == null) {
 -      _logger.error("State model does not contain init state, statemodel:" + _record.getId());
 -      return false;
 -    }
 -    if (_record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString()) == null) {
 -      _logger.error("CurrentState does not contain StatesPriorityList, state model : "
 -          + _record.getId());
 -      return false;
 -    }
 -    return true;
 +    return StateModelDefinitionValidator.isStateModelDefinitionValid(this);
    }
  
    // TODO move this to model.builder package, refactor StateModelConfigGenerator to use this

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/18a8c7cf/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------