You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/17 22:30:16 UTC
git commit: [HELIX-209] Moving rebalancer code around, take 3
Updated Branches:
refs/heads/helix-logical-model e032132a6 -> a9aa77638
[HELIX-209] Moving rebalancer code around, take 3
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/a9aa7763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/a9aa7763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/a9aa7763
Branch: refs/heads/helix-logical-model
Commit: a9aa77638398dd473b69c9cec64052d0d6486efa
Parents: e032132
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Oct 17 13:29:51 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Oct 17 13:29:51 2013 -0700
----------------------------------------------------------------------
.../stages/RebalanceIdealStateStage.java | 84 --------
.../controller/rebalancer/AutoRebalancer.java | 187 ------------------
.../controller/rebalancer/CustomRebalancer.java | 135 -------------
.../controller/rebalancer/Rebalancer.java | 6 +-
.../rebalancer/SemiAutoRebalancer.java | 83 --------
.../util/ConstraintBasedAssignment.java | 194 -------------------
6 files changed, 2 insertions(+), 687 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a9aa7763/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
deleted file mode 100644
index a9ab34e..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.deprecated.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Check and invoke custom implementation idealstate rebalancers.<br/>
- * If the resourceConfig has specified className of the customized rebalancer, <br/>
- * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- */
-@Deprecated
-public class RebalanceIdealStateStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(RebalanceIdealStateStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- HelixManager manager = event.getAttribute("helixmanager");
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, IdealState> idealStateMap = cache.getIdealStates();
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
- Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
- for (String resourceName : idealStateMap.keySet()) {
- IdealState currentIdealState = idealStateMap.get(resourceName);
- if (currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
- && currentIdealState.getRebalancerRef() != null) {
- String rebalancerClassName = currentIdealState.getRebalancerRef().toString();
- LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
- try {
- Rebalancer balancer =
- (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
- balancer.init(manager);
- Resource resource = new Resource(resourceName);
- for (String partitionName : currentIdealState.getPartitionSet()) {
- resource.addPartition(partitionName);
- }
- ResourceAssignment resourceAssignment =
- balancer.computeResourceMapping(resource, currentIdealState, currentStateOutput,
- cache);
- StateModelDefinition stateModelDef =
- cache.getStateModelDef(currentIdealState.getStateModelDefRef());
- currentIdealState.updateFromAssignment(resourceAssignment, stateModelDef);
- updatedIdealStates.put(resourceName, currentIdealState);
- } catch (Exception e) {
- LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
- }
- }
- }
- if (updatedIdealStates.size() > 0) {
- cache.getIdealStates().putAll(updatedIdealStates);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a9aa7763/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
deleted file mode 100644
index afe132e..0000000
--- a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package org.apache.helix.deprecated.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.deprecated.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
- * state of a resource, fully adapting to the addition or removal of instances. This includes
- * computation of a new preference list and a partition to instance and state mapping based on the
- * computed instance preferences.
- * The input is the current assignment of partitions to instances, as well as existing instance
- * preferences, if any.
- * The output is a preference list and a mapping based on that preference list, i.e. partition p
- * has a replica on node k with state s.
- */
-@Deprecated
-public class AutoRebalancer implements Rebalancer {
- // These should be final, but are initialized in init rather than a constructor
- private HelixManager _manager;
- private AutoRebalanceStrategy _algorithm;
-
- private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
-
- @Override
- public void init(HelixManager manager) {
- this._manager = manager;
- this._algorithm = null;
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- // Compute a preference list based on the current ideal state
- List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
- String stateModelName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
- Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
- String replicas = currentIdealState.getReplicas();
-
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- stateCountMap =
- ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
- Integer.parseInt(replicas));
- List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
- Map<String, Map<String, String>> currentMapping =
- currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
-
- // If there are nodes tagged with resource name, use only those nodes
- Set<String> taggedNodes = new HashSet<String>();
- if (currentIdealState.getInstanceGroupTag() != null) {
- for (String instanceName : liveNodes) {
- if (clusterData.getInstanceConfigMap().get(instanceName)
- .containsTag(currentIdealState.getInstanceGroupTag())) {
- taggedNodes.add(instanceName);
- }
- }
- }
- if (taggedNodes.size() > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
- + " " + taggedNodes);
- }
- liveNodes = new ArrayList<String>(taggedNodes);
- }
-
- List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
- int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("currentMapping: " + currentMapping);
- LOG.info("stateCountMap: " + stateCountMap);
- LOG.info("liveNodes: " + liveNodes);
- LOG.info("allNodes: " + allNodes);
- LOG.info("maxPartition: " + maxPartition);
- }
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- placementScheme.init(_manager);
- _algorithm =
- new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap,
- maxPartition, placementScheme);
- ZNRecord newMapping =
- _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("newMapping: " + newMapping);
- }
-
- IdealState newIdealState = new IdealState(resource.getResourceName());
- newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
- newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
- newIdealState.getRecord().setListFields(newMapping.getListFields());
-
- // compute a full partition mapping for the resource
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resource.getResourceName());
- }
- ResourceAssignment partitionMapping =
- new ResourceAssignment(ResourceId.from(resource.getResourceName()));
- for (String partitionName : partitions) {
- Partition partition = new Partition(partitionName);
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
- Set<String> disabledInstancesForPartition =
- clusterData.getDisabledInstancesForPartition(partition.toString());
- List<String> preferenceList =
- ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState,
- stateModelDef);
- Map<String, String> bestStateForPartition =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
- partitionMapping.addReplicaMap(PartitionId.from(partitionName),
- ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
- }
- return partitionMapping;
- }
-
- private Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
- String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
-
- Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
-
- for (String partition : partitions) {
- Map<String, String> curStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
- map.put(partition, new HashMap<String, String>());
- for (String node : curStateMap.keySet()) {
- String state = curStateMap.get(node);
- if (stateCountMap.containsKey(state)) {
- map.get(partition).put(node, state);
- }
- }
-
- Map<String, String> pendingStateMap =
- currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
- for (String node : pendingStateMap.keySet()) {
- String state = pendingStateMap.get(node);
- if (stateCountMap.containsKey(state)) {
- map.get(partition).put(node, state);
- }
- }
- }
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a9aa7763/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
deleted file mode 100644
index e1136a2..0000000
--- a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package org.apache.helix.deprecated.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
- * partitions against the set of live instances to mark assignment states as dropped or erroneous
- * as necessary.
- * The input is the required current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a verified mapping based on that preference list, i.e. partition p has a replica
- * on node k with state s, where s may be a dropped or error state if necessary.
- */
-@Deprecated
-public class CustomRebalancer implements Rebalancer {
-
- private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
-
- @Override
- public void init(HelixManager manager) {
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- String stateModelDefName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resource.getResourceName());
- }
- ResourceAssignment partitionMapping =
- new ResourceAssignment(ResourceId.from(resource.getResourceName()));
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
- Set<String> disabledInstancesForPartition =
- clusterData.getDisabledInstancesForPartition(partition.toString());
- Map<String, String> idealStateMap =
- IdealState.stringMapFromParticipantStateMap(currentIdealState
- .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
- Map<String, String> bestStateForPartition =
- computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
- currentStateMap, disabledInstancesForPartition);
- partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
- ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
- }
- return partitionMapping;
- }
-
- /**
- * compute best state for resource in CUSTOMIZED ideal state mode
- * @param cache
- * @param stateModelDef
- * @param idealStateMap
- * @param currentStateMap
- * @param disabledInstancesForPartition
- * @return
- */
- private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
-
- // if the ideal state is deleted, idealStateMap will be null/empty and
- // we should drop all resources.
- if (currentStateMap != null) {
- for (String instance : currentStateMap.keySet()) {
- if ((idealStateMap == null || !idealStateMap.containsKey(instance))
- && !disabledInstancesForPartition.contains(instance)) {
- // if dropped and not disabled, transit to DROPPED
- instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
- // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
- }
- }
- }
-
- // ideal state is deleted
- if (idealStateMap == null) {
- return instanceStateMap;
- }
-
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
- for (String instance : idealStateMap.keySet()) {
- boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(instance) == null
- || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
-
- if (liveInstancesMap.containsKey(instance) && notInErrorState
- && !disabledInstancesForPartition.contains(instance)) {
- instanceStateMap.put(instance, idealStateMap.get(instance));
- }
- }
-
- return instanceStateMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a9aa7763/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
index 59b827d..e4c2f26 100644
--- a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
@@ -23,8 +23,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
/**
* Allows one to come up with custom implementation of a rebalancer.<br/>
@@ -39,7 +37,7 @@ public interface Rebalancer {
* Initialize the rebalancer with a HelixManager if necessary
* @param manager
*/
- void init(HelixManager manager);
+ public void init(HelixManager manager);
/**
* Given an ideal state for a resource and liveness of instances, compute a assignment of
@@ -52,7 +50,7 @@ public interface Rebalancer {
* @param currentStateOutput the current states of all partitions
* @param clusterData cache of the cluster state
*/
- ResourceAssignment computeResourceMapping(final Resource resource,
+ public IdealState computeResourceMapping(final String resourceName,
final IdealState currentIdealState, final CurrentStateOutput currentStateOutput,
final ClusterDataCache clusterData);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a9aa7763/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
deleted file mode 100644
index b4259f9..0000000
--- a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.deprecated.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.deprecated.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
- * state of a resource based on a predefined preference list of instances willing to accept
- * replicas.
- * The input is the optional current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
- * with state s.
- */
-@Deprecated
-public class SemiAutoRebalancer implements Rebalancer {
-
- private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
-
- @Override
- public void init(HelixManager manager) {
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- String stateModelDefName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resource.getResourceName());
- }
- ResourceAssignment partitionMapping =
- new ResourceAssignment(ResourceId.from(resource.getResourceName()));
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
- Set<String> disabledInstancesForPartition =
- clusterData.getDisabledInstancesForPartition(partition.toString());
- List<String> preferenceList =
- ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState,
- stateModelDef);
- Map<String, String> bestStateForPartition =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
- partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
- ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
- }
- return partitionMapping;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a9aa7763/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
deleted file mode 100644
index c75423f..0000000
--- a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.apache.helix.deprecated.controller.rebalancer.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.<br/>
- * <br/>
- * Deprecated. Use {@link org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment} instead.
- */
-@Deprecated
-public class ConstraintBasedAssignment {
- private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
-
- public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
- IdealState idealState, StateModelDefinition stateModelDef) {
- List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
-
- if (listField != null && listField.size() == 1
- && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- List<String> prefList = new ArrayList<String>(liveInstances.keySet());
- Collections.sort(prefList);
- return prefList;
- } else {
- return listField;
- }
- }
-
- /**
- * compute best state for resource in AUTO ideal state mode
- * @param cache
- * @param stateModelDef
- * @param instancePreferenceList
- * @param currentStateMap
- * : instance->state for each partition
- * @param disabledInstancesForPartition
- * @return
- */
- public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef, List<String> instancePreferenceList,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
-
- // if the ideal state is deleted, instancePreferenceList will be empty and
- // we should drop all resources.
- if (currentStateMap != null) {
- for (String instance : currentStateMap.keySet()) {
- if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !disabledInstancesForPartition.contains(instance)) {
- // if dropped and not disabled, transit to DROPPED
- instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
- // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
- }
- }
- }
-
- // ideal state is deleted
- if (instancePreferenceList == null) {
- return instanceStateMap;
- }
-
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
- boolean assigned[] = new boolean[instancePreferenceList.size()];
-
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
-
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
- int stateCount = -1;
- if ("N".equals(num)) {
- Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
- liveAndEnabled.removeAll(disabledInstancesForPartition);
- stateCount = liveAndEnabled.size();
- } else if ("R".equals(num)) {
- stateCount = instancePreferenceList.size();
- } else {
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- logger.error("Invalid count for state:" + state + " ,count=" + num);
- }
- }
- if (stateCount > -1) {
- int count = 0;
- for (int i = 0; i < instancePreferenceList.size(); i++) {
- String instanceName = instancePreferenceList.get(i);
-
- boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(instanceName) == null
- || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
-
- if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
- && !disabledInstancesForPartition.contains(instanceName)) {
- instanceStateMap.put(instanceName, state);
- count = count + 1;
- assigned[i] = true;
- if (count == stateCount) {
- break;
- }
- }
- }
- }
- }
- return instanceStateMap;
- }
-
- /**
- * Get the number of replicas that should be in each state for a partition
- * @param stateModelDef StateModelDefinition object
- * @param liveNodesNb number of live nodes
- * @param total number of replicas
- * @return state count map: state->count
- */
- public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
- int liveNodesNb, int totalReplicas) {
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-
- int replicas = totalReplicas;
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
- if ("N".equals(num)) {
- stateCountMap.put(state, liveNodesNb);
- } else if ("R".equals(num)) {
- // wait until we get the counts for all other states
- continue;
- } else {
- int stateCount = -1;
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- // LOG.error("Invalid count for state: " + state + ", count: " + num +
- // ", use -1 instead");
- }
-
- if (stateCount > 0) {
- stateCountMap.put(state, stateCount);
- replicas -= stateCount;
- }
- }
- }
-
- // get state count for R
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
- if ("R".equals(num)) {
- stateCountMap.put(state, replicas);
- // should have at most one state using R
- break;
- }
- }
- return stateCountMap;
- }
-}