You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/16 02:09:44 UTC
[1/4] [HELIX-209] Moving rebalancer code around, take 2
Updated Branches:
refs/heads/helix-logical-model 9f229c80c -> e032132a6
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 7fe3314..369ad68 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,7 +33,7 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 0a578c1..74781cd 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,8 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.FullAutoRebalancerContext;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 8650475..5bbe54f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,9 +8,6 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.CustomRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.model.ResourceConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 6279087..ecb8151 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -39,10 +39,10 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 0dba374..45b0dac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -32,8 +32,8 @@ import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 50d5ad8..f8fa4a2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -41,7 +41,7 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.StateModelDefinition;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 8620383..9f52866 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -46,7 +46,7 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 688846e..eba322d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -32,9 +32,9 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.Rebalancer;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index df81000..7215707 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -21,7 +21,7 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
[3/4] [HELIX-209] Moving rebalancer code around, take 2
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java
deleted file mode 100644
index e1f1ac2..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
- @JsonProperty("preferenceLists")
- private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
- /**
- * Instantiate a SemiAutoRebalancerContext
- */
- public SemiAutoRebalancerContext() {
- super(RebalanceMode.SEMI_AUTO);
- setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
- _preferenceLists = Maps.newHashMap();
- }
-
- /**
- * Get the preference lists of all partitions of the resource
- * @return map of partition id to list of participant ids
- */
- public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
- return _preferenceLists;
- }
-
- /**
- * Set the preference lists of all partitions of the resource
- * @param preferenceLists
- */
- public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
- _preferenceLists = preferenceLists;
- }
-
- /**
- * Get the preference list of a partition
- * @param partitionId the partition to look up
- * @return list of participant ids
- */
- @JsonIgnore
- public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
- return _preferenceLists.get(partitionId);
- }
-
- /**
- * Generate preference lists based on a default cluster setup
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @Override
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // compute default upper bounds
- Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
- }
-
- // determine the current mapping
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
- for (PartitionId partitionId : getPartitionSet()) {
- List<ParticipantId> preferenceList = getPreferenceList(partitionId);
- if (preferenceList != null && !preferenceList.isEmpty()) {
- Set<ParticipantId> disabledParticipants = Collections.emptySet();
- Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
- Map<ParticipantId, State> initialMap =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
- participantSet, stateModelDef, preferenceList, emptyCurrentState,
- disabledParticipants);
- currentMapping.put(partitionId, initialMap);
- }
- }
-
- // determine the preference
- LinkedHashMap<State, Integer> stateCounts =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
- getReplicaCount());
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
- List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
- getMaxPartitionsPerParticipant(), placementScheme);
- Map<String, List<String>> rawPreferenceLists =
- strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
- .getListFields();
- Map<PartitionId, List<ParticipantId>> preferenceLists =
- Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
- setPreferenceLists(preferenceLists);
- }
-
- /**
- * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
- private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
- /**
- * Instantiate for a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
- _preferenceLists = Maps.newHashMap();
- }
-
- /**
- * Add a preference list for a partition
- * @param partitionId partition to set
- * @param preferenceList ordered list of participants who can serve the partition
- * @return Builder
- */
- public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
- _preferenceLists.put(partitionId, preferenceList);
- return self();
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public SemiAutoRebalancerContext build() {
- SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
- super.update(context);
- context.setPreferenceLists(_preferenceLists);
- return context;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java
deleted file mode 100644
index 0199796..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.apache.helix.api.rebalancer.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Collection of functions that will compute the best possible state based on the participants and
- * the rebalancer configuration of a resource.
- */
-public class ConstraintBasedAssignment {
- private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
-
- /**
- * Get a set of disabled participants for a partition
- * @param participantMap map of all participants
- * @param partitionId the partition to check
- * @return a set of all participants that are disabled for the partition
- */
- public static Set<ParticipantId> getDisabledParticipants(
- final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
- Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
- Set<ParticipantId> disabledParticipantsForPartition =
- Sets.filter(participantSet, new Predicate<ParticipantId>() {
- @Override
- public boolean apply(ParticipantId participantId) {
- Participant participant = participantMap.get(participantId);
- return !participant.isEnabled()
- || participant.getDisabledPartitionIds().contains(partitionId);
- }
- });
- return disabledParticipantsForPartition;
- }
-
- /**
- * Get an ordered list of participants that can serve a partition
- * @param cluster cluster snapshot
- * @param partitionId the partition to look up
- * @param config rebalancing constraints
- * @return list with most preferred participants first
- */
- public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
- List<ParticipantId> prefList) {
- if (prefList != null && prefList.size() == 1
- && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
- prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
- Collections.sort(prefList);
- }
- return prefList;
- }
-
- /**
- * Get a map of state to upper bound constraint given a cluster
- * @param stateModelDef the state model definition to check
- * @param resourceId the resource that is constraint
- * @param cluster the cluster the resource belongs to
- * @return map of state to upper bound
- */
- public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
- ResourceId resourceId, ClusterConfig cluster) {
- Map<State, String> stateMap = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- String num =
- cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
- stateModelDef.getStateModelDefId(), state);
- stateMap.put(state, num);
- }
- return stateMap;
- }
-
- /**
- * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
- * @param upperBounds map of state to upper bound
- * @param liveParticipantSet set of live participant ids
- * @param stateModelDef
- * @param participantPreferenceList
- * @param currentStateMap
- * : participant->state for each partition
- * @param disabledParticipantsForPartition
- * @return
- */
- public static Map<ParticipantId, State> computeAutoBestStateForPartition(
- Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
- StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
- Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
- Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
- // if the resource is deleted, instancePreferenceList will be empty and
- // we should drop all resources.
- if (currentStateMap != null) {
- for (ParticipantId participantId : currentStateMap.keySet()) {
- if ((participantPreferenceList == null || !participantPreferenceList
- .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
- // if dropped and not disabled, transit to DROPPED
- participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
- } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
- participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
- // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
- }
- }
- }
-
- // resource is deleted
- if (participantPreferenceList == null) {
- return participantStateMap;
- }
-
- List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
- boolean assigned[] = new boolean[participantPreferenceList.size()];
-
- for (State state : statesPriorityList) {
- String num = upperBounds.get(state);
- int stateCount = -1;
- if ("N".equals(num)) {
- Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
- liveAndEnabled.removeAll(disabledParticipantsForPartition);
- stateCount = liveAndEnabled.size();
- } else if ("R".equals(num)) {
- stateCount = participantPreferenceList.size();
- } else {
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- logger.error("Invalid count for state:" + state + " ,count=" + num);
- }
- }
- if (stateCount > -1) {
- int count = 0;
- for (int i = 0; i < participantPreferenceList.size(); i++) {
- ParticipantId participantId = participantPreferenceList.get(i);
-
- boolean notInErrorState =
- currentStateMap == null
- || currentStateMap.get(participantId) == null
- || !currentStateMap.get(participantId)
- .equals(State.from(HelixDefinedState.ERROR));
-
- if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
- participantStateMap.put(participantId, state);
- count = count + 1;
- assigned[i] = true;
- if (count == stateCount) {
- break;
- }
- }
- }
- }
- }
- return participantStateMap;
- }
-
- /**
- * Get the number of replicas that should be in each state for a partition
- * @param upperBounds map of state to upper bound
- * @param stateModelDef StateModelDefinition object
- * @param liveNodesNb number of live nodes
- * @param total number of replicas
- * @return state count map: state->count
- */
- public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
- StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
- LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
- List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-
- int replicas = totalReplicas;
- for (State state : statesPriorityList) {
- String num = upperBounds.get(state);
- if ("N".equals(num)) {
- stateCountMap.put(state, liveNodesNb);
- } else if ("R".equals(num)) {
- // wait until we get the counts for all other states
- continue;
- } else {
- int stateCount = -1;
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- // LOG.error("Invalid count for state: " + state + ", count: " + num +
- // ", use -1 instead");
- }
-
- if (stateCount > 0) {
- stateCountMap.put(state, stateCount);
- replicas -= stateCount;
- }
- }
- }
-
- // get state count for R
- for (State state : statesPriorityList) {
- String num = upperBounds.get(state);
- if ("R".equals(num)) {
- stateCountMap.put(state, replicas);
- // should have at most one state using R
- break;
- }
- }
- return stateCountMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
deleted file mode 100644
index 880f2c0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
- * state of a resource, fully adapting to the addition or removal of instances. This includes
- * computation of a new preference list and a partition to instance and state mapping based on the
- * computed instance preferences.
- * The input is the current assignment of partitions to instances, as well as existing instance
- * preferences, if any.
- * The output is a preference list and a mapping based on that preference list, i.e. partition p
- * has a replica on node k with state s.
- */
-@Deprecated
-public class AutoRebalancer implements Rebalancer {
- // These should be final, but are initialized in init rather than a constructor
- private HelixManager _manager;
- private AutoRebalanceStrategy _algorithm;
-
- private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
-
- @Override
- public void init(HelixManager manager) {
- this._manager = manager;
- this._algorithm = null;
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- // Compute a preference list based on the current ideal state
- List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
- String stateModelName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
- Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
- String replicas = currentIdealState.getReplicas();
-
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- stateCountMap =
- ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
- Integer.parseInt(replicas));
- List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
- Map<String, Map<String, String>> currentMapping =
- currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
-
- // If there are nodes tagged with resource name, use only those nodes
- Set<String> taggedNodes = new HashSet<String>();
- if (currentIdealState.getInstanceGroupTag() != null) {
- for (String instanceName : liveNodes) {
- if (clusterData.getInstanceConfigMap().get(instanceName)
- .containsTag(currentIdealState.getInstanceGroupTag())) {
- taggedNodes.add(instanceName);
- }
- }
- }
- if (taggedNodes.size() > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
- + " " + taggedNodes);
- }
- liveNodes = new ArrayList<String>(taggedNodes);
- }
-
- List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
- int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("currentMapping: " + currentMapping);
- LOG.info("stateCountMap: " + stateCountMap);
- LOG.info("liveNodes: " + liveNodes);
- LOG.info("allNodes: " + allNodes);
- LOG.info("maxPartition: " + maxPartition);
- }
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- placementScheme.init(_manager);
- _algorithm =
- new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap,
- maxPartition, placementScheme);
- ZNRecord newMapping =
- _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("newMapping: " + newMapping);
- }
-
- IdealState newIdealState = new IdealState(resource.getResourceName());
- newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
- newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
- newIdealState.getRecord().setListFields(newMapping.getListFields());
-
- // compute a full partition mapping for the resource
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resource.getResourceName());
- }
- ResourceAssignment partitionMapping =
- new ResourceAssignment(ResourceId.from(resource.getResourceName()));
- for (String partitionName : partitions) {
- Partition partition = new Partition(partitionName);
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
- Set<String> disabledInstancesForPartition =
- clusterData.getDisabledInstancesForPartition(partition.toString());
- List<String> preferenceList =
- ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState,
- stateModelDef);
- Map<String, String> bestStateForPartition =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
- partitionMapping.addReplicaMap(PartitionId.from(partitionName),
- ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
- }
- return partitionMapping;
- }
-
- private Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
- String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
-
- Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
-
- for (String partition : partitions) {
- Map<String, String> curStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
- map.put(partition, new HashMap<String, String>());
- for (String node : curStateMap.keySet()) {
- String state = curStateMap.get(node);
- if (stateCountMap.containsKey(state)) {
- map.get(partition).put(node, state);
- }
- }
-
- Map<String, String> pendingStateMap =
- currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
- for (String node : pendingStateMap.keySet()) {
- String state = pendingStateMap.get(node);
- if (stateCountMap.containsKey(state)) {
- map.get(partition).put(node, state);
- }
- }
- }
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index f6ea60f..ac4d328 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -1,5 +1,23 @@
package org.apache.helix.controller.rebalancer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,117 +37,89 @@ package org.apache.helix.controller.rebalancer;
* under the License.
*/
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
- * partitions against the set of live instances to mark assignment states as dropped or erroneous
- * as necessary.
- * The input is the required current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a verified mapping based on that preference list, i.e. partition p has a replica
- * on node k with state s, where s may be a dropped or error state if necessary.
- */
-@Deprecated
public class CustomRebalancer implements Rebalancer {
private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
@Override
- public void init(HelixManager manager) {
+ public void init(HelixManager helixManager) {
+ // do nothing
}
@Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- String stateModelDefName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
+ CustomRebalancerContext config =
+ rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resource.getResourceName());
+ LOG.debug("Processing resource:" + config.getResourceId());
}
- ResourceAssignment partitionMapping =
- new ResourceAssignment(ResourceId.from(resource.getResourceName()));
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
- Set<String> disabledInstancesForPartition =
- clusterData.getDisabledInstancesForPartition(partition.toString());
- Map<String, String> idealStateMap =
- IdealState.stringMapFromParticipantStateMap(currentIdealState
- .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
- Map<String, String> bestStateForPartition =
- computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
- currentStateMap, disabledInstancesForPartition);
- partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
- ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : config.getPartitionSet()) {
+ Map<ParticipantId, State> currentStateMap =
+ currentState.getCurrentStateMap(config.getResourceId(), partition);
+ Set<ParticipantId> disabledInstancesForPartition =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partition);
+ Map<ParticipantId, State> bestStateForPartition =
+ computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
+ stateModelDef, config.getPreferenceMap(partition), currentStateMap,
+ disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
/**
- * compute best state for resource in CUSTOMIZED ideal state mode
- * @param cache
+ * compute best state for resource in CUSTOMIZED rebalancer mode
+ * @param liveParticipantMap
* @param stateModelDef
- * @param idealStateMap
+ * @param preferenceMap
* @param currentStateMap
- * @param disabledInstancesForPartition
+ * @param disabledParticipantsForPartition
* @return
*/
- private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
+ private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+ Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+ Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
+ Set<ParticipantId> disabledParticipantsForPartition) {
+ Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
- // if the ideal state is deleted, idealStateMap will be null/empty and
+ // if the resource is deleted, idealStateMap will be null/empty and
// we should drop all resources.
if (currentStateMap != null) {
- for (String instance : currentStateMap.keySet()) {
- if ((idealStateMap == null || !idealStateMap.containsKey(instance))
- && !disabledInstancesForPartition.contains(instance)) {
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
+ && !disabledParticipantsForPartition.contains(participantId)) {
// if dropped and not disabled, transit to DROPPED
- instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
+ participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+ } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+ participantId).equals(State.from(HelixDefinedState.ERROR)))
+ && disabledParticipantsForPartition.contains(participantId)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
+ participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
}
}
// ideal state is deleted
- if (idealStateMap == null) {
- return instanceStateMap;
+ if (preferenceMap == null) {
+ return participantStateMap;
}
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
- for (String instance : idealStateMap.keySet()) {
+ for (ParticipantId participantId : preferenceMap.keySet()) {
boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(instance) == null
- || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+ currentStateMap == null || currentStateMap.get(participantId) == null
+ || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
- if (liveInstancesMap.containsKey(instance) && notInErrorState
- && !disabledInstancesForPartition.contains(instance)) {
- instanceStateMap.put(instance, idealStateMap.get(instance));
+ if (liveParticipantSet.contains(participantId) && notInErrorState
+ && !disabledParticipantsForPartition.contains(participantId)) {
+ participantStateMap.put(participantId, preferenceMap.get(participantId));
}
}
- return instanceStateMap;
+ return participantStateMap;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
new file mode 100644
index 0000000..b0c11d4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -0,0 +1,196 @@
+package org.apache.helix.controller.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class FullAutoRebalancer implements Rebalancer {
+ // These should be final, but are initialized in init rather than a constructor
+ private AutoRebalanceStrategy _algorithm;
+
+ private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
+
+ @Override
+ public void init(HelixManager helixManager) {
+ // do nothing
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
+ FullAutoRebalancerContext config =
+ rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
+ // Compute a preference list based on the current ideal state
+ List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+ Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+ Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+ int replicas = -1;
+ if (config.anyLiveParticipant()) {
+ replicas = liveParticipants.size();
+ } else {
+ replicas = config.getReplicaCount();
+ }
+
+ // count how many replicas should be in each state
+ Map<State, String> upperBounds =
+ ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+ cluster.getConfig());
+ LinkedHashMap<State, Integer> stateCountMap =
+ ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
+ liveParticipants.size(), replicas);
+
+ // get the participant lists
+ List<ParticipantId> liveParticipantList =
+ new ArrayList<ParticipantId>(liveParticipants.keySet());
+ List<ParticipantId> allParticipantList =
+ new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+
+ // compute the current mapping from the current state
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+ currentMapping(config, currentState, stateCountMap);
+
+ // If there are nodes tagged with resource, use only those nodes
+ Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
+ if (config.getParticipantGroupTag() != null) {
+ for (ParticipantId participantId : liveParticipantList) {
+ if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
+ taggedNodes.add(participantId);
+ }
+ }
+ }
+ if (taggedNodes.size() > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("found the following instances with tag " + config.getResourceId() + " "
+ + taggedNodes);
+ }
+ liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
+ }
+
+ // determine which nodes the replicas should live on
+ int maxPartition = config.getMaxPartitionsPerParticipant();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("currentMapping: " + currentMapping);
+ LOG.info("stateCountMap: " + stateCountMap);
+ LOG.info("liveNodes: " + liveParticipantList);
+ LOG.info("allNodes: " + allParticipantList);
+ LOG.info("maxPartition: " + maxPartition);
+ }
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ _algorithm =
+ new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+ placementScheme);
+ ZNRecord newMapping =
+ _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+ allParticipantList);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("newMapping: " + newMapping);
+ }
+
+ // compute a full partition mapping for the resource
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + config.getResourceId());
+ }
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : partitions) {
+ Set<ParticipantId> disabledParticipantsForPartition =
+ ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+ List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+ if (rawPreferenceList == null) {
+ rawPreferenceList = Collections.emptyList();
+ }
+ List<ParticipantId> preferenceList =
+ Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantName) {
+ return ParticipantId.from(participantName);
+ }
+ });
+ preferenceList =
+ ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
+ Map<ParticipantId, State> bestStateForPartition =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ liveParticipants.keySet(), stateModelDef, preferenceList,
+ currentState.getCurrentStateMap(config.getResourceId(), partition),
+ disabledParticipantsForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
+ }
+ return partitionMapping;
+ }
+
+ private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+ FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+ Map<State, Integer> stateCountMap) {
+ Map<PartitionId, Map<ParticipantId, State>> map =
+ new HashMap<PartitionId, Map<ParticipantId, State>>();
+
+ for (PartitionId partition : config.getPartitionSet()) {
+ Map<ParticipantId, State> curStateMap =
+ currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+ map.put(partition, new HashMap<ParticipantId, State>());
+ for (ParticipantId node : curStateMap.keySet()) {
+ State state = curStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition).put(node, state);
+ }
+ }
+
+ Map<ParticipantId, State> pendingStateMap =
+ currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+ for (ParticipantId node : pendingStateMap.keySet()) {
+ State state = pendingStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition).put(node, state);
+ }
+ }
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index 26fc2ef..5a6a24e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -1,5 +1,11 @@
package org.apache.helix.controller.rebalancer;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,40 +25,15 @@ package org.apache.helix.controller.rebalancer;
* under the License.
*/
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-
/**
* Allows one to come up with custom implementation of a rebalancer.<br/>
* This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the newIdealState for a resource in this method.<br/>
- * <br/>
- * Deprecated. Use {@link org.apache.helix.api.rebalancer.Rebalancer} instead.
+ * Simply return the resource assignment for a resource in this method.<br/>
*/
-@Deprecated
public interface Rebalancer {
- /**
- * Initialize the rebalancer with a HelixManager if necessary
- * @param manager
- */
- void init(HelixManager manager);
- /**
- * Given an ideal state for a resource and liveness of instances, compute a assignment of
- * instances and states to each partition of a resource. This method provides all the relevant
- * information needed to rebalance a resource. If you need additional information use
- * manager.getAccessor to read the cluster data. This allows one to compute the newIdealState
- * according to app specific requirements.
- * @param resourceName the resource for which a mapping will be computed
- * @param currentIdealState the IdealState that corresponds to this resource
- * @param currentStateOutput the current states of all partitions
- * @param clusterData cache of the cluster state
- */
- ResourceAssignment computeResourceMapping(final Resource resource,
- final IdealState currentIdealState, final CurrentStateOutput currentStateOutput,
- final ClusterDataCache clusterData);
+ public void init(HelixManager helixManager);
+
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+ ResourceCurrentState currentState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
new file mode 100644
index 0000000..79e4ba0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
+ */
+public class RebalancerRef {
+ private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
+ @JsonProperty("rebalancerClassName")
+ private final String _rebalancerClassName;
+
+ @JsonCreator
+ private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
+ _rebalancerClassName = rebalancerClassName;
+ }
+
+ /**
+ * Get an instantiated Rebalancer
+ * @return Rebalancer or null if instantiation failed
+ */
+ @JsonIgnore
+ public Rebalancer getRebalancer() {
+ try {
+ return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+ } catch (Exception e) {
+ LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return _rebalancerClassName;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof RebalancerRef) {
+ return this.toString().equals(((RebalancerRef) that).toString());
+ } else if (that instanceof String) {
+ return this.toString().equals(that);
+ }
+ return false;
+ }
+
+ /**
+ * Get a rebalancer class reference
+ * @param rebalancerClassName name of the class
+ * @return RebalancerRef or null if name is null
+ */
+ public static RebalancerRef from(String rebalancerClassName) {
+ if (rebalancerClassName == null) {
+ return null;
+ }
+ return new RebalancerRef(rebalancerClassName);
+ }
+
+ /**
+ * Get a RebalancerRef from a class object
+ * @param rebalancerClass class that implements Rebalancer
+ * @return RebalancerRef
+ */
+ public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
+ if (rebalancerClass == null) {
+ return null;
+ }
+ return RebalancerRef.from(rebalancerClass.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index dd9fcf1..96e3d4b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -1,5 +1,22 @@
package org.apache.helix.controller.rebalancer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,65 +36,48 @@ package org.apache.helix.controller.rebalancer;
* under the License.
*/
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
/**
- * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
- * state of a resource based on a predefined preference list of instances willing to accept
- * replicas.
- * The input is the optional current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
- * with state s.
+ * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
+ * locations of each partition replica
*/
-@Deprecated
public class SemiAutoRebalancer implements Rebalancer {
-
private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
@Override
- public void init(HelixManager manager) {
+ public void init(HelixManager helixManager) {
+ // do nothing
}
@Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
- CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
- String stateModelDefName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
+ SemiAutoRebalancerContext config =
+ rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resource.getResourceName());
+ LOG.debug("Processing resource:" + config.getResourceId());
}
- ResourceAssignment partitionMapping =
- new ResourceAssignment(ResourceId.from(resource.getResourceName()));
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
- Set<String> disabledInstancesForPartition =
- clusterData.getDisabledInstancesForPartition(partition.toString());
- List<String> preferenceList =
- ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState,
- stateModelDef);
- Map<String, String> bestStateForPartition =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
- partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
- ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : config.getPartitionSet()) {
+ Map<ParticipantId, State> currentStateMap =
+ currentState.getCurrentStateMap(config.getResourceId(), partition);
+ Set<ParticipantId> disabledInstancesForPartition =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partition);
+ List<ParticipantId> preferenceList =
+ ConstraintBasedAssignment.getPreferenceList(cluster, partition,
+ config.getPreferenceList(partition));
+ Map<State, String> upperBounds =
+ ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+ cluster.getConfig());
+ Map<ParticipantId, State> bestStateForPartition =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+ .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
+ disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
new file mode 100644
index 0000000..ec765d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
@@ -0,0 +1,240 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerContext implements RebalancerContext {
+ private ResourceId _resourceId;
+ private StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
+ private String _participantGroupTag;
+ private Class<? extends ContextSerializer> _serializer;
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Instantiate a basic rebalancer context
+ */
+ public BasicRebalancerContext() {
+ _serializer = DefaultContextSerializer.class;
+ }
+
+ @Override
+ public ResourceId getResourceId() {
+ return _resourceId;
+ }
+
+ /**
+ * Set the resource to rebalance
+ * @param resourceId resource id
+ */
+ public void setResourceId(ResourceId resourceId) {
+ _resourceId = resourceId;
+ }
+
+ @Override
+ public StateModelDefId getStateModelDefId() {
+ return _stateModelDefId;
+ }
+
+ /**
+ * Set the state model definition that the resource follows
+ * @param stateModelDefId state model definition id
+ */
+ public void setStateModelDefId(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ }
+
+ @Override
+ public StateModelFactoryId getStateModelFactoryId() {
+ return _stateModelFactoryId;
+ }
+
+ /**
+ * Set the state model factory that the resource uses
+ * @param stateModelFactoryId state model factory id
+ */
+ public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ }
+
+ @Override
+ public String getParticipantGroupTag() {
+ return _participantGroupTag;
+ }
+
+ /**
+ * Set a tag that participants must have in order to serve this resource
+ * @param participantGroupTag string group tag
+ */
+ public void setParticipantGroupTag(String participantGroupTag) {
+ _participantGroupTag = participantGroupTag;
+ }
+
+ /**
+ * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
+ */
+ @Override
+ public Class<? extends ContextSerializer> getSerializerClass() {
+ return _serializer;
+ }
+
+ /**
+ * Set the class that can serialize this context
+ * @param serializer serializer class that implements ContextSerializer
+ */
+ public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
+ _serializer = serializer;
+ }
+
+ @Override
+ @JsonIgnore
+ public Set<? extends PartitionId> getSubUnitIdSet() {
+ return getSubUnitMap().keySet();
+ }
+
+ @Override
+ @JsonIgnore
+ public Partition getSubUnit(PartitionId subUnitId) {
+ return getSubUnitMap().get(subUnitId);
+ }
+
+ @Override
+ public RebalancerRef getRebalancerRef() {
+ return _rebalancerRef;
+ }
+
+ /**
+ * Set the reference to the class used to rebalance this resource
+ * @param rebalancerRef RebalancerRef instance
+ */
+ public void setRebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ }
+
+ /**
+ * Abstract builder for the base rebalancer context
+ */
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+ private final ResourceId _resourceId;
+ private StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
+ private String _participantGroupTag;
+ private Class<? extends ContextSerializer> _serializerClass;
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Instantiate with a resource id
+ * @param resourceId resource id
+ */
+ public AbstractBuilder(ResourceId resourceId) {
+ _resourceId = resourceId;
+ _serializerClass = DefaultContextSerializer.class;
+ }
+
+ /**
+ * Set the state model definition that the resource should follow
+ * @param stateModelDefId state model definition id
+ * @return Builder
+ */
+ public T stateModelDefId(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ return self();
+ }
+
+ /**
+ * Set the state model factory that the resource should use
+ * @param stateModelFactoryId state model factory id
+ * @return Builder
+ */
+ public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ return self();
+ }
+
+ /**
+ * Set the tag that all participants require in order to serve this resource
+ * @param participantGroupTag the tag
+ * @return Builder
+ */
+ public T participantGroupTag(String participantGroupTag) {
+ _participantGroupTag = participantGroupTag;
+ return self();
+ }
+
+ /**
+ * Set the serializer class for this rebalancer context
+ * @param serializerClass class that implements ContextSerializer
+ * @return Builder
+ */
+ public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
+ _serializerClass = serializerClass;
+ return self();
+ }
+
+ /**
+ * Specify a custom class to use for rebalancing
+ * @param rebalancerRef RebalancerRef instance
+ * @return Builder
+ */
+ public T rebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ return self();
+ }
+
+ /**
+ * Update an existing context with base fields
+ * @param context derived context
+ */
+ protected final void update(BasicRebalancerContext context) {
+ context.setResourceId(_resourceId);
+ context.setStateModelDefId(_stateModelDefId);
+ context.setStateModelFactoryId(_stateModelFactoryId);
+ context.setParticipantGroupTag(_participantGroupTag);
+ context.setSerializerClass(_serializerClass);
+ context.setRebalancerRef(_rebalancerRef);
+ }
+
+ /**
+ * Get a typed reference to "this" class. Final derived classes should simply return the this
+ * reference.
+ * @return this for the most specific type
+ */
+ protected abstract T self();
+
+ /**
+ * Get the rebalancer context from the built fields
+ * @return RebalancerContext
+ */
+ public abstract RebalancerContext build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
new file mode 100644
index 0000000..ef12a09
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface ContextSerializer {
+ /**
+ * Convert a RebalancerContext object instance to a String
+ * @param data instance of the rebalancer context type
+ * @return String representing the object
+ */
+ public <T> String serialize(final T data);
+
+ /**
+ * Convert raw bytes to a generic object instance
+ * @param clazz The class represented by the deserialized string
+ * @param string String representing the object
+ * @return instance of the generic type or null if the conversion failed
+ */
+ public <T> T deserialize(final Class<T> clazz, final String string);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
new file mode 100644
index 0000000..1fc1cda
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -0,0 +1,163 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
+ * corresponds to {@link CustomRebalancer}
+ */
+public class CustomRebalancerContext extends PartitionedRebalancerContext {
+ private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+ /**
+ * Instantiate a CustomRebalancerContext
+ */
+ public CustomRebalancerContext() {
+ super(RebalanceMode.CUSTOMIZED);
+ setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+ _preferenceMaps = Maps.newHashMap();
+ }
+
+ /**
+ * Get the preference maps of the partitions and replicas of the resource
+ * @return map of partition to participant and state
+ */
+ public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+ return _preferenceMaps;
+ }
+
+ /**
+ * Set the preference maps of the partitions and replicas of the resource
+ * @param preferenceMaps map of partition to participant and state
+ */
+ public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+ _preferenceMaps = preferenceMaps;
+ }
+
+ /**
+ * Get the preference map of a partition
+ * @param partitionId the partition to look up
+ * @return map of participant to state
+ */
+ @JsonIgnore
+ public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+ return _preferenceMaps.get(partitionId);
+ }
+
+ /**
+ * Generate preference maps based on a default cluster setup
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @Override
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // compute default upper bounds
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
+ upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+ }
+
+ // determine the current mapping
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+ // determine the preference maps
+ LinkedHashMap<State, Integer> stateCounts =
+ ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+ getReplicaCount());
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+ List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+ getMaxPartitionsPerParticipant(), placementScheme);
+ Map<String, Map<String, String>> rawPreferenceMaps =
+ strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+ .getMapFields();
+ Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+ Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+ setPreferenceMaps(preferenceMaps);
+ }
+
+ /**
+ * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+ private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+ /**
+ * Instantiate for a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+ _preferenceMaps = Maps.newHashMap();
+ }
+
+ /**
+ * Add a preference map for a partition
+ * @param partitionId partition to set
+ * @param preferenceList map of participant id to state indicating where replicas are served
+ * @return Builder
+ */
+ public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+ _preferenceMaps.put(partitionId, preferenceMap);
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public CustomRebalancerContext build() {
+ CustomRebalancerContext context = new CustomRebalancerContext();
+ super.update(context);
+ context.setPreferenceMaps(_preferenceMaps);
+ return context;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
new file mode 100644
index 0000000..ecc93fb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
@@ -0,0 +1,83 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
+ * convert to and from strings
+ */
+public class DefaultContextSerializer implements ContextSerializer {
+
+ private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
+
+ @Override
+ public <T> String serialize(final T data) {
+ if (data == null) {
+ return null;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, data);
+ } catch (Exception e) {
+ logger.error("Exception during payload data serialization.", e);
+ throw new HelixException(e);
+ }
+ return sw.toString();
+ }
+
+ @Override
+ public <T> T deserialize(final Class<T> clazz, final String string) {
+ if (string == null || string.length() == 0) {
+ return null;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
+
+ DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+ deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ try {
+ T payload = mapper.readValue(bais, clazz);
+ return payload;
+ } catch (Exception e) {
+ logger.error("Exception during deserialization of payload bytes: " + string, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
new file mode 100644
index 0000000..2db9ac6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
@@ -0,0 +1,63 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
+ public FullAutoRebalancerContext() {
+ super(RebalanceMode.FULL_AUTO);
+ setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+ }
+
+ /**
+ * Builder for a full auto rebalancer context. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+ /**
+ * Instantiate with a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public FullAutoRebalancerContext build() {
+ FullAutoRebalancerContext context = new FullAutoRebalancerContext();
+ super.update(context);
+ return context;
+ }
+ }
+}
[4/4] git commit: [HELIX-209] Moving rebalancer code around, take 2
Posted by ka...@apache.org.
[HELIX-209] Moving rebalancer code around, take 2
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e032132a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e032132a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e032132a
Branch: refs/heads/helix-logical-model
Commit: e032132a6232bb9c66f00dc1d75f4cd61b6f91b1
Parents: 9f229c8
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 15 17:09:14 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 15 17:09:14 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/api/Resource.java | 4 +-
.../api/accessor/AtomicResourceAccessor.java | 2 +-
.../helix/api/accessor/ClusterAccessor.java | 4 +-
.../helix/api/accessor/ParticipantAccessor.java | 4 +-
.../helix/api/accessor/ResourceAccessor.java | 10 +-
.../apache/helix/api/config/ResourceConfig.java | 4 +-
.../api/rebalancer/BasicRebalancerContext.java | 239 ------------
.../helix/api/rebalancer/ContextSerializer.java | 37 --
.../helix/api/rebalancer/CustomRebalancer.java | 123 ------
.../api/rebalancer/CustomRebalancerContext.java | 161 --------
.../rebalancer/DefaultContextSerializer.java | 83 -----
.../api/rebalancer/FullAutoRebalancer.java | 194 ----------
.../rebalancer/FullAutoRebalancerContext.java | 61 ---
.../PartitionedRebalancerContext.java | 372 ------------------
.../apache/helix/api/rebalancer/Rebalancer.java | 38 --
.../helix/api/rebalancer/RebalancerConfig.java | 177 ---------
.../helix/api/rebalancer/RebalancerContext.java | 93 -----
.../helix/api/rebalancer/RebalancerRef.java | 94 -----
.../rebalancer/ReplicatedRebalancerContext.java | 40 --
.../api/rebalancer/SemiAutoRebalancer.java | 81 ----
.../rebalancer/SemiAutoRebalancerContext.java | 176 ---------
.../util/ConstraintBasedAssignment.java | 244 ------------
.../controller/rebalancer/AutoRebalancer.java | 187 ----------
.../controller/rebalancer/CustomRebalancer.java | 142 ++++---
.../rebalancer/FullAutoRebalancer.java | 196 ++++++++++
.../helix/controller/rebalancer/Rebalancer.java | 41 +-
.../controller/rebalancer/RebalancerRef.java | 94 +++++
.../rebalancer/SemiAutoRebalancer.java | 94 ++---
.../context/BasicRebalancerContext.java | 240 ++++++++++++
.../rebalancer/context/ContextSerializer.java | 37 ++
.../context/CustomRebalancerContext.java | 163 ++++++++
.../context/DefaultContextSerializer.java | 83 +++++
.../context/FullAutoRebalancerContext.java | 63 ++++
.../context/PartitionedRebalancerContext.java | 373 +++++++++++++++++++
.../rebalancer/context/RebalancerConfig.java | 178 +++++++++
.../rebalancer/context/RebalancerContext.java | 94 +++++
.../context/ReplicatedRebalancerContext.java | 40 ++
.../context/SemiAutoRebalancerContext.java | 178 +++++++++
.../util/ConstraintBasedAssignment.java | 182 +++++----
.../stages/BestPossibleStateCalcStage.java | 8 +-
.../stages/ExternalViewComputeStage.java | 4 +-
.../stages/MessageGenerationStage.java | 2 +-
.../stages/MessageSelectionStage.java | 6 +-
.../stages/RebalanceIdealStateStage.java | 2 +-
.../stages/ResourceComputationStage.java | 6 +-
.../controller/rebalancer/AutoRebalancer.java | 187 ++++++++++
.../controller/rebalancer/CustomRebalancer.java | 135 +++++++
.../controller/rebalancer/Rebalancer.java | 58 +++
.../rebalancer/SemiAutoRebalancer.java | 83 +++++
.../util/ConstraintBasedAssignment.java | 194 ++++++++++
.../java/org/apache/helix/model/IdealState.java | 2 +-
.../org/apache/helix/tools/NewClusterSetup.java | 8 +-
.../org/apache/helix/api/TestNewStages.java | 2 +-
.../org/apache/helix/api/TestUpdateConfig.java | 4 +-
.../context/TestSerializeRebalancerContext.java | 3 -
.../helix/controller/stages/BaseStageTest.java | 4 +-
.../stages/TestResourceComputationStage.java | 2 +-
.../strategy/TestAutoRebalanceStrategy.java | 2 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 6 +-
.../apache/helix/examples/NewModelExample.java | 2 +-
61 files changed, 2681 insertions(+), 2667 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index a505aeb..79a1e09 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,8 +30,8 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 74b6bdb..6d69981 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -6,7 +6,7 @@ import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.lock.HelixLock;
import org.apache.helix.lock.HelixLockable;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ed6c844..85b8432 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index ff24ee9..83dd53e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index b959f17..517c8c4 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,11 +38,11 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.CustomRebalancerContext;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index ec1d8f6..38d48ab 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -7,8 +7,8 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.Scope;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import com.google.common.collect.Sets;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java
deleted file mode 100644
index b346cda..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java
+++ /dev/null
@@ -1,239 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerContext implements RebalancerContext {
- private ResourceId _resourceId;
- private StateModelDefId _stateModelDefId;
- private StateModelFactoryId _stateModelFactoryId;
- private String _participantGroupTag;
- private Class<? extends ContextSerializer> _serializer;
- private RebalancerRef _rebalancerRef;
-
- /**
- * Instantiate a basic rebalancer context
- */
- public BasicRebalancerContext() {
- _serializer = DefaultContextSerializer.class;
- }
-
- @Override
- public ResourceId getResourceId() {
- return _resourceId;
- }
-
- /**
- * Set the resource to rebalance
- * @param resourceId resource id
- */
- public void setResourceId(ResourceId resourceId) {
- _resourceId = resourceId;
- }
-
- @Override
- public StateModelDefId getStateModelDefId() {
- return _stateModelDefId;
- }
-
- /**
- * Set the state model definition that the resource follows
- * @param stateModelDefId state model definition id
- */
- public void setStateModelDefId(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
- }
-
- @Override
- public StateModelFactoryId getStateModelFactoryId() {
- return _stateModelFactoryId;
- }
-
- /**
- * Set the state model factory that the resource uses
- * @param stateModelFactoryId state model factory id
- */
- public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
- }
-
- @Override
- public String getParticipantGroupTag() {
- return _participantGroupTag;
- }
-
- /**
- * Set a tag that participants must have in order to serve this resource
- * @param participantGroupTag string group tag
- */
- public void setParticipantGroupTag(String participantGroupTag) {
- _participantGroupTag = participantGroupTag;
- }
-
- /**
- * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
- */
- @Override
- public Class<? extends ContextSerializer> getSerializerClass() {
- return _serializer;
- }
-
- /**
- * Set the class that can serialize this context
- * @param serializer serializer class that implements ContextSerializer
- */
- public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
- _serializer = serializer;
- }
-
- @Override
- @JsonIgnore
- public Set<? extends PartitionId> getSubUnitIdSet() {
- return getSubUnitMap().keySet();
- }
-
- @Override
- @JsonIgnore
- public Partition getSubUnit(PartitionId subUnitId) {
- return getSubUnitMap().get(subUnitId);
- }
-
- @Override
- public RebalancerRef getRebalancerRef() {
- return _rebalancerRef;
- }
-
- /**
- * Set the reference to the class used to rebalance this resource
- * @param rebalancerRef RebalancerRef instance
- */
- public void setRebalancerRef(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
- }
-
- /**
- * Abstract builder for the base rebalancer context
- */
- public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
- private final ResourceId _resourceId;
- private StateModelDefId _stateModelDefId;
- private StateModelFactoryId _stateModelFactoryId;
- private String _participantGroupTag;
- private Class<? extends ContextSerializer> _serializerClass;
- private RebalancerRef _rebalancerRef;
-
- /**
- * Instantiate with a resource id
- * @param resourceId resource id
- */
- public AbstractBuilder(ResourceId resourceId) {
- _resourceId = resourceId;
- _serializerClass = DefaultContextSerializer.class;
- }
-
- /**
- * Set the state model definition that the resource should follow
- * @param stateModelDefId state model definition id
- * @return Builder
- */
- public T stateModelDefId(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
- return self();
- }
-
- /**
- * Set the state model factory that the resource should use
- * @param stateModelFactoryId state model factory id
- * @return Builder
- */
- public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
- return self();
- }
-
- /**
- * Set the tag that all participants require in order to serve this resource
- * @param participantGroupTag the tag
- * @return Builder
- */
- public T participantGroupTag(String participantGroupTag) {
- _participantGroupTag = participantGroupTag;
- return self();
- }
-
- /**
- * Set the serializer class for this rebalancer context
- * @param serializerClass class that implements ContextSerializer
- * @return Builder
- */
- public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
- _serializerClass = serializerClass;
- return self();
- }
-
- /**
- * Specify a custom class to use for rebalancing
- * @param rebalancerRef RebalancerRef instance
- * @return Builder
- */
- public T rebalancerRef(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
- return self();
- }
-
- /**
- * Update an existing context with base fields
- * @param context derived context
- */
- protected final void update(BasicRebalancerContext context) {
- context.setResourceId(_resourceId);
- context.setStateModelDefId(_stateModelDefId);
- context.setStateModelFactoryId(_stateModelFactoryId);
- context.setParticipantGroupTag(_participantGroupTag);
- context.setSerializerClass(_serializerClass);
- context.setRebalancerRef(_rebalancerRef);
- }
-
- /**
- * Get a typed reference to "this" class. Final derived classes should simply return the this
- * reference.
- * @return this for the most specific type
- */
- protected abstract T self();
-
- /**
- * Get the rebalancer context from the built fields
- * @return RebalancerContext
- */
- public abstract RebalancerContext build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java
deleted file mode 100644
index 10e3ba9..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface ContextSerializer {
- /**
- * Convert a RebalancerContext object instance to a String
- * @param data instance of the rebalancer context type
- * @return String representing the object
- */
- public <T> String serialize(final T data);
-
- /**
- * Convert raw bytes to a generic object instance
- * @param clazz The class represented by the deserialized string
- * @param string String representing the object
- * @return instance of the generic type or null if the conversion failed
- */
- public <T> T deserialize(final Class<T> clazz, final String string);
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java
deleted file mode 100644
index c5e4e80..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class CustomRebalancer implements Rebalancer {
-
- private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
-
- @Override
- public void init(HelixManager helixManager) {
- // do nothing
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- CustomRebalancerContext config =
- rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
- StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(config.getStateModelDefId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + config.getResourceId());
- }
- ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
- for (PartitionId partition : config.getPartitionSet()) {
- Map<ParticipantId, State> currentStateMap =
- currentState.getCurrentStateMap(config.getResourceId(), partition);
- Set<ParticipantId> disabledInstancesForPartition =
- ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition);
- Map<ParticipantId, State> bestStateForPartition =
- computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
- stateModelDef, config.getPreferenceMap(partition), currentStateMap,
- disabledInstancesForPartition);
- partitionMapping.addReplicaMap(partition, bestStateForPartition);
- }
- return partitionMapping;
- }
-
- /**
- * compute best state for resource in CUSTOMIZED rebalancer mode
- * @param liveParticipantMap
- * @param stateModelDef
- * @param preferenceMap
- * @param currentStateMap
- * @param disabledParticipantsForPartition
- * @return
- */
- private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
- Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
- Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
- Set<ParticipantId> disabledParticipantsForPartition) {
- Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
- // if the resource is deleted, idealStateMap will be null/empty and
- // we should drop all resources.
- if (currentStateMap != null) {
- for (ParticipantId participantId : currentStateMap.keySet()) {
- if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
- && !disabledParticipantsForPartition.contains(participantId)) {
- // if dropped and not disabled, transit to DROPPED
- participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
- } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
- participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
- // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
- }
- }
- }
-
- // ideal state is deleted
- if (preferenceMap == null) {
- return participantStateMap;
- }
-
- for (ParticipantId participantId : preferenceMap.keySet()) {
- boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(participantId) == null
- || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
-
- if (liveParticipantSet.contains(participantId) && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
- participantStateMap.put(participantId, preferenceMap.get(participantId));
- }
- }
-
- return participantStateMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java
deleted file mode 100644
index 9d5b7e8..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerContext extends PartitionedRebalancerContext {
- private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
- /**
- * Instantiate a CustomRebalancerContext
- */
- public CustomRebalancerContext() {
- super(RebalanceMode.CUSTOMIZED);
- setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
- _preferenceMaps = Maps.newHashMap();
- }
-
- /**
- * Get the preference maps of the partitions and replicas of the resource
- * @return map of partition to participant and state
- */
- public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
- return _preferenceMaps;
- }
-
- /**
- * Set the preference maps of the partitions and replicas of the resource
- * @param preferenceMaps map of partition to participant and state
- */
- public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
- _preferenceMaps = preferenceMaps;
- }
-
- /**
- * Get the preference map of a partition
- * @param partitionId the partition to look up
- * @return map of participant to state
- */
- @JsonIgnore
- public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
- return _preferenceMaps.get(partitionId);
- }
-
- /**
- * Generate preference maps based on a default cluster setup
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @Override
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // compute default upper bounds
- Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
- }
-
- // determine the current mapping
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
- // determine the preference maps
- LinkedHashMap<State, Integer> stateCounts =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
- getReplicaCount());
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
- List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
- getMaxPartitionsPerParticipant(), placementScheme);
- Map<String, Map<String, String>> rawPreferenceMaps =
- strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
- .getMapFields();
- Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
- Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
- setPreferenceMaps(preferenceMaps);
- }
-
- /**
- * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
- private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
- /**
- * Instantiate for a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
- _preferenceMaps = Maps.newHashMap();
- }
-
- /**
- * Add a preference map for a partition
- * @param partitionId partition to set
- * @param preferenceList map of participant id to state indicating where replicas are served
- * @return Builder
- */
- public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
- _preferenceMaps.put(partitionId, preferenceMap);
- return self();
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public CustomRebalancerContext build() {
- CustomRebalancerContext context = new CustomRebalancerContext();
- super.update(context);
- context.setPreferenceMaps(_preferenceMaps);
- return context;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java
deleted file mode 100644
index 64e1f8b..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
- * convert to and from strings
- */
-public class DefaultContextSerializer implements ContextSerializer {
-
- private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
-
- @Override
- public <T> String serialize(final T data) {
- if (data == null) {
- return null;
- }
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, data);
- } catch (Exception e) {
- logger.error("Exception during payload data serialization.", e);
- throw new HelixException(e);
- }
- return sw.toString();
- }
-
- @Override
- public <T> T deserialize(final Class<T> clazz, final String string) {
- if (string == null || string.length() == 0) {
- return null;
- }
-
- ObjectMapper mapper = new ObjectMapper();
- ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
-
- DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
- deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
- deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- try {
- T payload = mapper.readValue(bais, clazz);
- return payload;
- } catch (Exception e) {
- logger.error("Exception during deserialization of payload bytes: " + string, e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java
deleted file mode 100644
index f5a5abe..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class FullAutoRebalancer implements Rebalancer {
- // These should be final, but are initialized in init rather than a constructor
- private AutoRebalanceStrategy _algorithm;
-
- private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
-
- @Override
- public void init(HelixManager helixManager) {
- // do nothing
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- FullAutoRebalancerContext config =
- rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
- StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(config.getStateModelDefId());
- // Compute a preference list based on the current ideal state
- List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
- Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
- Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
- int replicas = -1;
- if (config.anyLiveParticipant()) {
- replicas = liveParticipants.size();
- } else {
- replicas = config.getReplicaCount();
- }
-
- // count how many replicas should be in each state
- Map<State, String> upperBounds =
- ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
- cluster.getConfig());
- LinkedHashMap<State, Integer> stateCountMap =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
- liveParticipants.size(), replicas);
-
- // get the participant lists
- List<ParticipantId> liveParticipantList =
- new ArrayList<ParticipantId>(liveParticipants.keySet());
- List<ParticipantId> allParticipantList =
- new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
-
- // compute the current mapping from the current state
- Map<PartitionId, Map<ParticipantId, State>> currentMapping =
- currentMapping(config, currentState, stateCountMap);
-
- // If there are nodes tagged with resource, use only those nodes
- Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
- if (config.getParticipantGroupTag() != null) {
- for (ParticipantId participantId : liveParticipantList) {
- if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
- taggedNodes.add(participantId);
- }
- }
- }
- if (taggedNodes.size() > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following instances with tag " + config.getResourceId() + " "
- + taggedNodes);
- }
- liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
- }
-
- // determine which nodes the replicas should live on
- int maxPartition = config.getMaxPartitionsPerParticipant();
- if (LOG.isInfoEnabled()) {
- LOG.info("currentMapping: " + currentMapping);
- LOG.info("stateCountMap: " + stateCountMap);
- LOG.info("liveNodes: " + liveParticipantList);
- LOG.info("allNodes: " + allParticipantList);
- LOG.info("maxPartition: " + maxPartition);
- }
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- _algorithm =
- new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
- placementScheme);
- ZNRecord newMapping =
- _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
- allParticipantList);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("newMapping: " + newMapping);
- }
-
- // compute a full partition mapping for the resource
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + config.getResourceId());
- }
- ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
- for (PartitionId partition : partitions) {
- Set<ParticipantId> disabledParticipantsForPartition =
- ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
- List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
- if (rawPreferenceList == null) {
- rawPreferenceList = Collections.emptyList();
- }
- List<ParticipantId> preferenceList =
- Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
- @Override
- public ParticipantId apply(String participantName) {
- return ParticipantId.from(participantName);
- }
- });
- preferenceList =
- ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
- Map<ParticipantId, State> bestStateForPartition =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
- liveParticipants.keySet(), stateModelDef, preferenceList,
- currentState.getCurrentStateMap(config.getResourceId(), partition),
- disabledParticipantsForPartition);
- partitionMapping.addReplicaMap(partition, bestStateForPartition);
- }
- return partitionMapping;
- }
-
- private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
- FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
- Map<State, Integer> stateCountMap) {
- Map<PartitionId, Map<ParticipantId, State>> map =
- new HashMap<PartitionId, Map<ParticipantId, State>>();
-
- for (PartitionId partition : config.getPartitionSet()) {
- Map<ParticipantId, State> curStateMap =
- currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
- map.put(partition, new HashMap<ParticipantId, State>());
- for (ParticipantId node : curStateMap.keySet()) {
- State state = curStateMap.get(node);
- if (stateCountMap.containsKey(state)) {
- map.get(partition).put(node, state);
- }
- }
-
- Map<ParticipantId, State> pendingStateMap =
- currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
- for (ParticipantId node : pendingStateMap.keySet()) {
- State state = pendingStateMap.get(node);
- if (stateCountMap.containsKey(state)) {
- map.get(partition).put(node, state);
- }
- }
- }
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java
deleted file mode 100644
index 6a3fe10..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
- public FullAutoRebalancerContext() {
- super(RebalanceMode.FULL_AUTO);
- setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
- }
-
- /**
- * Builder for a full auto rebalancer context. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public FullAutoRebalancerContext build() {
- FullAutoRebalancerContext context = new FullAutoRebalancerContext();
- super.update(context);
- return context;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java
deleted file mode 100644
index 311683d..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java
+++ /dev/null
@@ -1,372 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerContext extends BasicRebalancerContext implements
- ReplicatedRebalancerContext {
- private Map<PartitionId, Partition> _partitionMap;
- private boolean _anyLiveParticipant;
- private int _replicaCount;
- private int _maxPartitionsPerParticipant;
- private final RebalanceMode _rebalanceMode;
-
- /**
- * Instantiate a DataRebalancerContext
- */
- public PartitionedRebalancerContext(RebalanceMode rebalanceMode) {
- _partitionMap = Collections.emptyMap();
- _replicaCount = 1;
- _anyLiveParticipant = false;
- _maxPartitionsPerParticipant = Integer.MAX_VALUE;
- _rebalanceMode = rebalanceMode;
- }
-
- /**
- * Get a map from partition id to partition
- * @return partition map (mutable)
- */
- public Map<PartitionId, Partition> getPartitionMap() {
- return _partitionMap;
- }
-
- /**
- * Set a map of partition id to partition
- * @param partitionMap partition map
- */
- public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
- _partitionMap = Maps.newHashMap(partitionMap);
- }
-
- /**
- * Get the set of partitions for this resource
- * @return set of partition ids
- */
- @JsonIgnore
- public Set<PartitionId> getPartitionSet() {
- return _partitionMap.keySet();
- }
-
- /**
- * Get a partition
- * @param partitionId id of the partition to get
- * @return Partition object, or null if not present
- */
- @JsonIgnore
- public Partition getPartition(PartitionId partitionId) {
- return _partitionMap.get(partitionId);
- }
-
- @Override
- public boolean anyLiveParticipant() {
- return _anyLiveParticipant;
- }
-
- /**
- * Indicate if this resource should be assigned to any live participant
- * @param anyLiveParticipant true if any live participant expected, false otherwise
- */
- public void setAnyLiveParticipant(boolean anyLiveParticipant) {
- _anyLiveParticipant = anyLiveParticipant;
- }
-
- @Override
- public int getReplicaCount() {
- return _replicaCount;
- }
-
- /**
- * Set the number of replicas that each partition should have
- * @param replicaCount
- */
- public void setReplicaCount(int replicaCount) {
- _replicaCount = replicaCount;
- }
-
- /**
- * Get the maximum number of partitions that a participant can serve
- * @return maximum number of partitions per participant
- */
- public int getMaxPartitionsPerParticipant() {
- return _maxPartitionsPerParticipant;
- }
-
- /**
- * Set the maximum number of partitions that a participant can serve
- * @param maxPartitionsPerParticipant maximum number of partitions per participant
- */
- public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
- _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
- }
-
- /**
- * Get the rebalancer mode of the resource
- * @return RebalanceMode
- */
- public RebalanceMode getRebalanceMode() {
- return _rebalanceMode;
- }
-
- @Override
- @JsonIgnore
- public Map<PartitionId, Partition> getSubUnitMap() {
- return getPartitionMap();
- }
-
- /**
- * Generate a default configuration given the state model and a participant.
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // the base context does not understand enough to know do to anything
- }
-
- /**
- * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
- * @param idealState populated IdealState
- * @return PartitionedRebalancerContext
- */
- public static PartitionedRebalancerContext from(IdealState idealState) {
- PartitionedRebalancerContext context;
- switch (idealState.getRebalanceMode()) {
- case FULL_AUTO:
- FullAutoRebalancerContext.Builder fullAutoBuilder =
- new FullAutoRebalancerContext.Builder(idealState.getResourceId());
- populateContext(fullAutoBuilder, idealState);
- context = fullAutoBuilder.build();
- break;
- case SEMI_AUTO:
- SemiAutoRebalancerContext.Builder semiAutoBuilder =
- new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
- }
- populateContext(semiAutoBuilder, idealState);
- context = semiAutoBuilder.build();
- break;
- case CUSTOMIZED:
- CustomRebalancerContext.Builder customBuilder =
- new CustomRebalancerContext.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
- }
- populateContext(customBuilder, idealState);
- context = customBuilder.build();
- break;
- default:
- Builder baseBuilder = new Builder(idealState.getResourceId());
- populateContext(baseBuilder, idealState);
- context = baseBuilder.build();
- break;
- }
- return context;
- }
-
- /**
- * Update a builder subclass with all the fields of the ideal state
- * @param builder builder that extends AbstractBuilder
- * @param idealState populated IdealState
- */
- private static <T extends AbstractBuilder<T>> void populateContext(T builder,
- IdealState idealState) {
- String replicas = idealState.getReplicas();
- int replicaCount = 0;
- boolean anyLiveParticipant = false;
- if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
- anyLiveParticipant = true;
- } else {
- replicaCount = Integer.parseInt(replicas);
- }
- if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
- // backwards compatibility: partition sets were based on pref lists/maps previously
- builder.addPartitions(idealState.getNumPartitions());
- } else {
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- builder.addPartition(new Partition(partitionId));
- }
- }
- builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
- .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
- .participantGroupTag(idealState.getInstanceGroupTag())
- .stateModelDefId(idealState.getStateModelDefId())
- .stateModelFactoryId(idealState.getStateModelFactoryId());
- RebalancerRef rebalancerRef = idealState.getRebalancerRef();
- if (rebalancerRef != null) {
- builder.rebalancerRef(rebalancerRef);
- }
- }
-
- /**
- * Builder for a basic data rebalancer context
- */
- public static final class Builder extends AbstractBuilder<Builder> {
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public PartitionedRebalancerContext build() {
- PartitionedRebalancerContext context =
- new PartitionedRebalancerContext(RebalanceMode.USER_DEFINED);
- super.update(context);
- return context;
- }
- }
-
- /**
- * Abstract builder for a generic partitioned resource rebalancer context
- */
- public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
- extends BasicRebalancerContext.AbstractBuilder<T> {
- private final ResourceId _resourceId;
- private final Map<PartitionId, Partition> _partitionMap;
- private boolean _anyLiveParticipant;
- private int _replicaCount;
- private int _maxPartitionsPerParticipant;
-
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public AbstractBuilder(ResourceId resourceId) {
- super(resourceId);
- _resourceId = resourceId;
- _partitionMap = Maps.newHashMap();
- _anyLiveParticipant = false;
- _replicaCount = 1;
- _maxPartitionsPerParticipant = Integer.MAX_VALUE;
- }
-
- /**
- * Add a partition that the resource serves
- * @param partition fully-qualified partition
- * @return Builder
- */
- public T addPartition(Partition partition) {
- _partitionMap.put(partition.getId(), partition);
- return self();
- }
-
- /**
- * Add a collection of partitions
- * @param partitions any collection of Partition objects
- * @return Builder
- */
- public T addPartitions(Collection<Partition> partitions) {
- for (Partition partition : partitions) {
- addPartition(partition);
- }
- return self();
- }
-
- /**
- * Add a specified number of partitions with a default naming scheme, namely
- * resourceId_partitionNumber where partitionNumber starts at 0
- * @param partitionCount number of partitions to add
- * @return Builder
- */
- public T addPartitions(int partitionCount) {
- for (int i = 0; i < partitionCount; i++) {
- addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
- }
- return self();
- }
-
- /**
- * Set whether any live participant should be used in rebalancing
- * @param anyLiveParticipant true if any live participant can be used, false otherwise
- * @return Builder
- */
- public T anyLiveParticipant(boolean anyLiveParticipant) {
- _anyLiveParticipant = anyLiveParticipant;
- return self();
- }
-
- /**
- * Set the number of replicas
- * @param replicaCount number of replicas
- * @return Builder
- */
- public T replicaCount(int replicaCount) {
- _replicaCount = replicaCount;
- return self();
- }
-
- /**
- * Set the maximum number of partitions to assign to any participant
- * @param maxPartitionsPerParticipant the maximum
- * @return Builder
- */
- public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
- _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
- return self();
- }
-
- /**
- * Update a DataRebalancerContext with fields from this builder level
- * @param context DataRebalancerContext
- */
- protected final void update(PartitionedRebalancerContext context) {
- super.update(context);
- // enforce at least one partition
- if (_partitionMap.isEmpty()) {
- addPartitions(1);
- }
- context.setPartitionMap(_partitionMap);
- context.setAnyLiveParticipant(_anyLiveParticipant);
- context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
- context.setReplicaCount(_replicaCount);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java
deleted file mode 100644
index e164920..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Allows one to come up with custom implementation of a rebalancer.<br/>
- * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the resource assignment for a resource in this method.<br/>
- */
-public interface Rebalancer {
-
- public void init(HelixManager helixManager);
-
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
- ResourceCurrentState currentState);
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java
deleted file mode 100644
index 2c57a29..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
- * information specific to each rebalancer.
- */
-public final class RebalancerConfig {
- private enum Fields {
- SERIALIZER_CLASS,
- REBALANCER_CONTEXT,
- REBALANCER_CONTEXT_CLASS
- }
-
- private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
- private ContextSerializer _serializer;
- private Rebalancer _rebalancer;
- private final RebalancerContext _context;
- private final NamespacedConfig _config;
-
- /**
- * Instantiate a RebalancerConfig
- * @param context rebalancer context
- * @param rebalancerRef reference to the rebalancer class that will be used
- */
- public RebalancerConfig(RebalancerContext context) {
- _config =
- new NamespacedConfig(Scope.resource(context.getResourceId()),
- RebalancerConfig.class.getSimpleName());
- _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
- .getName());
- _config
- .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
- _context = context;
- try {
- _serializer = context.getSerializerClass().newInstance();
- _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
- } catch (InstantiationException e) {
- LOG.error("Error initializing the configuration", e);
- } catch (IllegalAccessException e) {
- LOG.error("Error initializing the configuration", e);
- }
- }
-
- /**
- * Instantiate from a physical ResourceConfiguration
- * @param resourceConfiguration populated ResourceConfiguration
- */
- public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
- _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
- _serializer = getSerializer();
- _context = getContext();
- }
-
- /**
- * Get the class that can serialize and deserialize the rebalancer context
- * @return ContextSerializer
- */
- private ContextSerializer getSerializer() {
- String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
- if (serializerClassName != null) {
- try {
- return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
- .newInstance();
- } catch (InstantiationException e) {
- LOG.error("Error getting the serializer", e);
- } catch (IllegalAccessException e) {
- LOG.error("Error getting the serializer", e);
- } catch (ClassNotFoundException e) {
- LOG.error("Error getting the serializer", e);
- }
- }
- return null;
- }
-
- private RebalancerContext getContext() {
- String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
- try {
- Class<? extends RebalancerContext> contextClass =
- HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
- String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
- return _serializer.deserialize(contextClass, serialized);
- } catch (ClassNotFoundException e) {
- LOG.error(className + " is not a valid class");
- } catch (ClassCastException e) {
- LOG.error(className + " does not implement RebalancerContext");
- }
- return null;
- }
-
- /**
- * Get a rebalancer class instance
- * @return Rebalancer
- */
- public Rebalancer getRebalancer() {
- // cache the rebalancer to avoid loading and instantiating it excessively
- if (_rebalancer == null) {
- if (_context == null || _context.getRebalancerRef() == null) {
- return null;
- }
- _rebalancer = _context.getRebalancerRef().getRebalancer();
- }
- return _rebalancer;
- }
-
- /**
- * Get the instantiated RebalancerContext
- * @param contextClass specific class of the RebalancerContext
- * @return RebalancerContext subclass instance, or null if conversion is not possible
- */
- public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
- try {
- return contextClass.cast(_context);
- } catch (ClassCastException e) {
- LOG.info(contextClass + " is incompatible with context class: " + _context.getClass());
- }
- return null;
- }
-
- /**
- * Get the rebalancer context serialized as a string
- * @return string representing the context
- */
- public String getSerializedContext() {
- return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
- }
-
- /**
- * Convert this to a namespaced config
- * @return NamespacedConfig
- */
- public NamespacedConfig toNamespacedConfig() {
- return _config;
- }
-
- /**
- * Get a RebalancerConfig from a physical resource config
- * @param resourceConfiguration physical resource config
- * @return RebalancerConfig
- */
- public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
- return new RebalancerConfig(resourceConfiguration);
- }
-
- /**
- * Get a RebalancerConfig from a RebalancerContext
- * @param context instantiated RebalancerContext
- * @return RebalancerConfig
- */
- public static RebalancerConfig from(RebalancerContext context) {
- return new RebalancerConfig(context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java
deleted file mode 100644
index 80f6b06..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
- */
-public interface RebalancerContext {
- /**
- * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
- * resource, e.g. a subtask of a task
- * @return map of (subunit id, subunit) pairs
- */
- public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
- /**
- * Get the subunits of the resource (e.g. partitions)
- * @return set of subunit ids
- */
- public Set<? extends PartitionId> getSubUnitIdSet();
-
- /**
- * Get a specific subunit
- * @param subUnitId the id of the subunit
- * @return SubUnit
- */
- public Partition getSubUnit(PartitionId subUnitId);
-
- /**
- * Get the resource to rebalance
- * @return resource id
- */
- public ResourceId getResourceId();
-
- /**
- * Get the state model definition that the resource follows
- * @return state model definition id
- */
- public StateModelDefId getStateModelDefId();
-
- /**
- * Get the state model factory of this resource
- * @return state model factory id
- */
- public StateModelFactoryId getStateModelFactoryId();
-
- /**
- * Get the tag, if any, that participants must have in order to serve this resource
- * @return participant group tag, or null
- */
- public String getParticipantGroupTag();
-
- /**
- * Get the serializer for this context
- * @return ContextSerializer class object
- */
- public Class<? extends ContextSerializer> getSerializerClass();
-
- /**
- * Get a reference to the class used to rebalance this resource
- * @return RebalancerRef
- */
- public RebalancerRef getRebalancerRef();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java
deleted file mode 100644
index 83ae589..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
- */
-public class RebalancerRef {
- private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
-
- @JsonProperty("rebalancerClassName")
- private final String _rebalancerClassName;
-
- @JsonCreator
- private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
- _rebalancerClassName = rebalancerClassName;
- }
-
- /**
- * Get an instantiated Rebalancer
- * @return Rebalancer or null if instantiation failed
- */
- @JsonIgnore
- public Rebalancer getRebalancer() {
- try {
- return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
- } catch (Exception e) {
- LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
- }
- return null;
- }
-
- @Override
- public String toString() {
- return _rebalancerClassName;
- }
-
- @Override
- public boolean equals(Object that) {
- if (that instanceof RebalancerRef) {
- return this.toString().equals(((RebalancerRef) that).toString());
- } else if (that instanceof String) {
- return this.toString().equals(that);
- }
- return false;
- }
-
- /**
- * Get a rebalancer class reference
- * @param rebalancerClassName name of the class
- * @return RebalancerRef or null if name is null
- */
- public static RebalancerRef from(String rebalancerClassName) {
- if (rebalancerClassName == null) {
- return null;
- }
- return new RebalancerRef(rebalancerClassName);
- }
-
- /**
- * Get a RebalancerRef from a class object
- * @param rebalancerClass class that implements Rebalancer
- * @return RebalancerRef
- */
- public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
- if (rebalancerClass == null) {
- return null;
- }
- return RebalancerRef.from(rebalancerClass.getName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java
deleted file mode 100644
index 4e98ce7..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerContext extends RebalancerContext {
- /**
- * Check if this resource should be assigned to any live participant
- * @return true if any live participant expected, false otherwise
- */
- public boolean anyLiveParticipant();
-
- /**
- * Get the number of replicas that each resource subunit should have
- * @return replica count
- */
- public int getReplicaCount();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java
deleted file mode 100644
index 80ab256..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
- * locations of each partition replica
- */
-public class SemiAutoRebalancer implements Rebalancer {
- private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
-
- @Override
- public void init(HelixManager helixManager) {
- // do nothing
- }
-
- @Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- SemiAutoRebalancerContext config =
- rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
- StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(config.getStateModelDefId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + config.getResourceId());
- }
- ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
- for (PartitionId partition : config.getPartitionSet()) {
- Map<ParticipantId, State> currentStateMap =
- currentState.getCurrentStateMap(config.getResourceId(), partition);
- Set<ParticipantId> disabledInstancesForPartition =
- ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition);
- List<ParticipantId> preferenceList =
- ConstraintBasedAssignment.getPreferenceList(cluster, partition,
- config.getPreferenceList(partition));
- Map<State, String> upperBounds =
- ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
- cluster.getConfig());
- Map<ParticipantId, State> bestStateForPartition =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
- .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
- disabledInstancesForPartition);
- partitionMapping.addReplicaMap(partition, bestStateForPartition);
- }
- return partitionMapping;
- }
-
-}
[2/4] [HELIX-209] Moving rebalancer code around, take 2
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
new file mode 100644
index 0000000..9d24e68
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -0,0 +1,373 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
+ * be replicated.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionedRebalancerContext extends BasicRebalancerContext implements
+ ReplicatedRebalancerContext {
+ private Map<PartitionId, Partition> _partitionMap;
+ private boolean _anyLiveParticipant;
+ private int _replicaCount;
+ private int _maxPartitionsPerParticipant;
+ private final RebalanceMode _rebalanceMode;
+
+ /**
+ * Instantiate a DataRebalancerContext
+ */
+ public PartitionedRebalancerContext(RebalanceMode rebalanceMode) {
+ _partitionMap = Collections.emptyMap();
+ _replicaCount = 1;
+ _anyLiveParticipant = false;
+ _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+ _rebalanceMode = rebalanceMode;
+ }
+
+ /**
+ * Get a map from partition id to partition
+ * @return partition map (mutable)
+ */
+ public Map<PartitionId, Partition> getPartitionMap() {
+ return _partitionMap;
+ }
+
+ /**
+ * Set a map of partition id to partition
+ * @param partitionMap partition map
+ */
+ public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
+ _partitionMap = Maps.newHashMap(partitionMap);
+ }
+
+ /**
+ * Get the set of partitions for this resource
+ * @return set of partition ids
+ */
+ @JsonIgnore
+ public Set<PartitionId> getPartitionSet() {
+ return _partitionMap.keySet();
+ }
+
+ /**
+ * Get a partition
+ * @param partitionId id of the partition to get
+ * @return Partition object, or null if not present
+ */
+ @JsonIgnore
+ public Partition getPartition(PartitionId partitionId) {
+ return _partitionMap.get(partitionId);
+ }
+
+ @Override
+ public boolean anyLiveParticipant() {
+ return _anyLiveParticipant;
+ }
+
+ /**
+ * Indicate if this resource should be assigned to any live participant
+ * @param anyLiveParticipant true if any live participant expected, false otherwise
+ */
+ public void setAnyLiveParticipant(boolean anyLiveParticipant) {
+ _anyLiveParticipant = anyLiveParticipant;
+ }
+
+ @Override
+ public int getReplicaCount() {
+ return _replicaCount;
+ }
+
+ /**
+ * Set the number of replicas that each partition should have
+ * @param replicaCount
+ */
+ public void setReplicaCount(int replicaCount) {
+ _replicaCount = replicaCount;
+ }
+
+ /**
+ * Get the maximum number of partitions that a participant can serve
+ * @return maximum number of partitions per participant
+ */
+ public int getMaxPartitionsPerParticipant() {
+ return _maxPartitionsPerParticipant;
+ }
+
+ /**
+ * Set the maximum number of partitions that a participant can serve
+ * @param maxPartitionsPerParticipant maximum number of partitions per participant
+ */
+ public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+ _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+ }
+
+ /**
+ * Get the rebalancer mode of the resource
+ * @return RebalanceMode
+ */
+ public RebalanceMode getRebalanceMode() {
+ return _rebalanceMode;
+ }
+
+ @Override
+ @JsonIgnore
+ public Map<PartitionId, Partition> getSubUnitMap() {
+ return getPartitionMap();
+ }
+
+ /**
+ * Generate a default configuration given the state model and a participant.
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // the base context does not understand enough to know do to anything
+ }
+
+ /**
+ * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
+ * @param idealState populated IdealState
+ * @return PartitionedRebalancerContext
+ */
+ public static PartitionedRebalancerContext from(IdealState idealState) {
+ PartitionedRebalancerContext context;
+ switch (idealState.getRebalanceMode()) {
+ case FULL_AUTO:
+ FullAutoRebalancerContext.Builder fullAutoBuilder =
+ new FullAutoRebalancerContext.Builder(idealState.getResourceId());
+ populateContext(fullAutoBuilder, idealState);
+ context = fullAutoBuilder.build();
+ break;
+ case SEMI_AUTO:
+ SemiAutoRebalancerContext.Builder semiAutoBuilder =
+ new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
+ }
+ populateContext(semiAutoBuilder, idealState);
+ context = semiAutoBuilder.build();
+ break;
+ case CUSTOMIZED:
+ CustomRebalancerContext.Builder customBuilder =
+ new CustomRebalancerContext.Builder(idealState.getResourceId());
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+ }
+ populateContext(customBuilder, idealState);
+ context = customBuilder.build();
+ break;
+ default:
+ Builder baseBuilder = new Builder(idealState.getResourceId());
+ populateContext(baseBuilder, idealState);
+ context = baseBuilder.build();
+ break;
+ }
+ return context;
+ }
+
+ /**
+ * Update a builder subclass with all the fields of the ideal state
+ * @param builder builder that extends AbstractBuilder
+ * @param idealState populated IdealState
+ */
+ private static <T extends AbstractBuilder<T>> void populateContext(T builder,
+ IdealState idealState) {
+ String replicas = idealState.getReplicas();
+ int replicaCount = 0;
+ boolean anyLiveParticipant = false;
+ if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+ anyLiveParticipant = true;
+ } else {
+ replicaCount = Integer.parseInt(replicas);
+ }
+ if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
+ // backwards compatibility: partition sets were based on pref lists/maps previously
+ builder.addPartitions(idealState.getNumPartitions());
+ } else {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ builder.addPartition(new Partition(partitionId));
+ }
+ }
+ builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+ .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+ .participantGroupTag(idealState.getInstanceGroupTag())
+ .stateModelDefId(idealState.getStateModelDefId())
+ .stateModelFactoryId(idealState.getStateModelFactoryId());
+ RebalancerRef rebalancerRef = idealState.getRebalancerRef();
+ if (rebalancerRef != null) {
+ builder.rebalancerRef(rebalancerRef);
+ }
+ }
+
+ /**
+ * Builder for a basic data rebalancer context
+ */
+ public static final class Builder extends AbstractBuilder<Builder> {
+ /**
+ * Instantiate with a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public PartitionedRebalancerContext build() {
+ PartitionedRebalancerContext context =
+ new PartitionedRebalancerContext(RebalanceMode.USER_DEFINED);
+ super.update(context);
+ return context;
+ }
+ }
+
+ /**
+ * Abstract builder for a generic partitioned resource rebalancer context
+ */
+ public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
+ extends BasicRebalancerContext.AbstractBuilder<T> {
+ private final ResourceId _resourceId;
+ private final Map<PartitionId, Partition> _partitionMap;
+ private boolean _anyLiveParticipant;
+ private int _replicaCount;
+ private int _maxPartitionsPerParticipant;
+
+ /**
+ * Instantiate with a resource
+ * @param resourceId resource id
+ */
+ public AbstractBuilder(ResourceId resourceId) {
+ super(resourceId);
+ _resourceId = resourceId;
+ _partitionMap = Maps.newHashMap();
+ _anyLiveParticipant = false;
+ _replicaCount = 1;
+ _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+ }
+
+ /**
+ * Add a partition that the resource serves
+ * @param partition fully-qualified partition
+ * @return Builder
+ */
+ public T addPartition(Partition partition) {
+ _partitionMap.put(partition.getId(), partition);
+ return self();
+ }
+
+ /**
+ * Add a collection of partitions
+ * @param partitions any collection of Partition objects
+ * @return Builder
+ */
+ public T addPartitions(Collection<Partition> partitions) {
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ }
+ return self();
+ }
+
+ /**
+ * Add a specified number of partitions with a default naming scheme, namely
+ * resourceId_partitionNumber where partitionNumber starts at 0
+ * @param partitionCount number of partitions to add
+ * @return Builder
+ */
+ public T addPartitions(int partitionCount) {
+ for (int i = 0; i < partitionCount; i++) {
+ addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
+ }
+ return self();
+ }
+
+ /**
+ * Set whether any live participant should be used in rebalancing
+ * @param anyLiveParticipant true if any live participant can be used, false otherwise
+ * @return Builder
+ */
+ public T anyLiveParticipant(boolean anyLiveParticipant) {
+ _anyLiveParticipant = anyLiveParticipant;
+ return self();
+ }
+
+ /**
+ * Set the number of replicas
+ * @param replicaCount number of replicas
+ * @return Builder
+ */
+ public T replicaCount(int replicaCount) {
+ _replicaCount = replicaCount;
+ return self();
+ }
+
+ /**
+ * Set the maximum number of partitions to assign to any participant
+ * @param maxPartitionsPerParticipant the maximum
+ * @return Builder
+ */
+ public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+ _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+ return self();
+ }
+
+ /**
+ * Update a DataRebalancerContext with fields from this builder level
+ * @param context DataRebalancerContext
+ */
+ protected final void update(PartitionedRebalancerContext context) {
+ super.update(context);
+ // enforce at least one partition
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
+ context.setPartitionMap(_partitionMap);
+ context.setAnyLiveParticipant(_anyLiveParticipant);
+ context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+ context.setReplicaCount(_replicaCount);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
new file mode 100644
index 0000000..846fd01
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
+ * information specific to each rebalancer.
+ */
+public final class RebalancerConfig {
+ private enum Fields {
+ SERIALIZER_CLASS,
+ REBALANCER_CONTEXT,
+ REBALANCER_CONTEXT_CLASS
+ }
+
+ private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
+ private ContextSerializer _serializer;
+ private Rebalancer _rebalancer;
+ private final RebalancerContext _context;
+ private final NamespacedConfig _config;
+
+ /**
+ * Instantiate a RebalancerConfig
+ * @param context rebalancer context
+ * @param rebalancerRef reference to the rebalancer class that will be used
+ */
+ public RebalancerConfig(RebalancerContext context) {
+ _config =
+ new NamespacedConfig(Scope.resource(context.getResourceId()),
+ RebalancerConfig.class.getSimpleName());
+ _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
+ .getName());
+ _config
+ .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
+ _context = context;
+ try {
+ _serializer = context.getSerializerClass().newInstance();
+ _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
+ } catch (InstantiationException e) {
+ LOG.error("Error initializing the configuration", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Error initializing the configuration", e);
+ }
+ }
+
+ /**
+ * Instantiate from a physical ResourceConfiguration
+ * @param resourceConfiguration populated ResourceConfiguration
+ */
+ public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
+ _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
+ _serializer = getSerializer();
+ _context = getContext();
+ }
+
+ /**
+ * Get the class that can serialize and deserialize the rebalancer context
+ * @return ContextSerializer
+ */
+ private ContextSerializer getSerializer() {
+ String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+ if (serializerClassName != null) {
+ try {
+ return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+ .newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error getting the serializer", e);
+ }
+ }
+ return null;
+ }
+
+ private RebalancerContext getContext() {
+ String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
+ try {
+ Class<? extends RebalancerContext> contextClass =
+ HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
+ String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+ return _serializer.deserialize(contextClass, serialized);
+ } catch (ClassNotFoundException e) {
+ LOG.error(className + " is not a valid class");
+ } catch (ClassCastException e) {
+ LOG.error(className + " does not implement RebalancerContext");
+ }
+ return null;
+ }
+
+ /**
+ * Get a rebalancer class instance
+ * @return Rebalancer
+ */
+ public Rebalancer getRebalancer() {
+ // cache the rebalancer to avoid loading and instantiating it excessively
+ if (_rebalancer == null) {
+ if (_context == null || _context.getRebalancerRef() == null) {
+ return null;
+ }
+ _rebalancer = _context.getRebalancerRef().getRebalancer();
+ }
+ return _rebalancer;
+ }
+
+ /**
+ * Get the instantiated RebalancerContext
+ * @param contextClass specific class of the RebalancerContext
+ * @return RebalancerContext subclass instance, or null if conversion is not possible
+ */
+ public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
+ try {
+ return contextClass.cast(_context);
+ } catch (ClassCastException e) {
+ LOG.info(contextClass + " is incompatible with context class: " + _context.getClass());
+ }
+ return null;
+ }
+
+ /**
+ * Get the rebalancer context serialized as a string
+ * @return string representing the context
+ */
+ public String getSerializedContext() {
+ return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+ }
+
+ /**
+ * Convert this to a namespaced config
+ * @return NamespacedConfig
+ */
+ public NamespacedConfig toNamespacedConfig() {
+ return _config;
+ }
+
+ /**
+ * Get a RebalancerConfig from a physical resource config
+ * @param resourceConfiguration physical resource config
+ * @return RebalancerConfig
+ */
+ public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
+ return new RebalancerConfig(resourceConfiguration);
+ }
+
+ /**
+ * Get a RebalancerConfig from a RebalancerContext
+ * @param context instantiated RebalancerContext
+ * @return RebalancerConfig
+ */
+ public static RebalancerConfig from(RebalancerContext context) {
+ return new RebalancerConfig(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
new file mode 100644
index 0000000..981891b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerContext {
+ /**
+ * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+ * resource, e.g. a subtask of a task
+ * @return map of (subunit id, subunit) pairs
+ */
+ public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+ /**
+ * Get the subunits of the resource (e.g. partitions)
+ * @return set of subunit ids
+ */
+ public Set<? extends PartitionId> getSubUnitIdSet();
+
+ /**
+ * Get a specific subunit
+ * @param subUnitId the id of the subunit
+ * @return SubUnit
+ */
+ public Partition getSubUnit(PartitionId subUnitId);
+
+ /**
+ * Get the resource to rebalance
+ * @return resource id
+ */
+ public ResourceId getResourceId();
+
+ /**
+ * Get the state model definition that the resource follows
+ * @return state model definition id
+ */
+ public StateModelDefId getStateModelDefId();
+
+ /**
+ * Get the state model factory of this resource
+ * @return state model factory id
+ */
+ public StateModelFactoryId getStateModelFactoryId();
+
+ /**
+ * Get the tag, if any, that participants must have in order to serve this resource
+ * @return participant group tag, or null
+ */
+ public String getParticipantGroupTag();
+
+ /**
+ * Get the serializer for this context
+ * @return ContextSerializer class object
+ */
+ public Class<? extends ContextSerializer> getSerializerClass();
+
+ /**
+ * Get a reference to the class used to rebalance this resource
+ * @return RebalancerRef
+ */
+ public RebalancerRef getRebalancerRef();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
new file mode 100644
index 0000000..525931d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerContext extends RebalancerContext {
+ /**
+ * Check if this resource should be assigned to any live participant
+ * @return true if any live participant expected, false otherwise
+ */
+ public boolean anyLiveParticipant();
+
+ /**
+ * Get the number of replicas that each resource subunit should have
+ * @return replica count
+ */
+ public int getReplicaCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
new file mode 100644
index 0000000..f574a62
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
+ @JsonProperty("preferenceLists")
+ private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Instantiate a SemiAutoRebalancerContext
+ */
+ public SemiAutoRebalancerContext() {
+ super(RebalanceMode.SEMI_AUTO);
+ setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+ _preferenceLists = Maps.newHashMap();
+ }
+
+ /**
+ * Get the preference lists of all partitions of the resource
+ * @return map of partition id to list of participant ids
+ */
+ public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+ return _preferenceLists;
+ }
+
+ /**
+ * Set the preference lists of all partitions of the resource
+ * @param preferenceLists
+ */
+ public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+ _preferenceLists = preferenceLists;
+ }
+
+ /**
+ * Get the preference list of a partition
+ * @param partitionId the partition to look up
+ * @return list of participant ids
+ */
+ @JsonIgnore
+ public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+ return _preferenceLists.get(partitionId);
+ }
+
+ /**
+ * Generate preference lists based on a default cluster setup
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @Override
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // compute default upper bounds
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
+ upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+ }
+
+ // determine the current mapping
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+ for (PartitionId partitionId : getPartitionSet()) {
+ List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ Set<ParticipantId> disabledParticipants = Collections.emptySet();
+ Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+ Map<ParticipantId, State> initialMap =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ participantSet, stateModelDef, preferenceList, emptyCurrentState,
+ disabledParticipants);
+ currentMapping.put(partitionId, initialMap);
+ }
+ }
+
+ // determine the preference
+ LinkedHashMap<State, Integer> stateCounts =
+ ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+ getReplicaCount());
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+ List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+ getMaxPartitionsPerParticipant(), placementScheme);
+ Map<String, List<String>> rawPreferenceLists =
+ strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+ .getListFields();
+ Map<PartitionId, List<ParticipantId>> preferenceLists =
+ Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+ setPreferenceLists(preferenceLists);
+ }
+
+ /**
+ * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+ private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Instantiate for a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+ _preferenceLists = Maps.newHashMap();
+ }
+
+ /**
+ * Add a preference list for a partition
+ * @param partitionId partition to set
+ * @param preferenceList ordered list of participants who can serve the partition
+ * @return Builder
+ */
+ public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+ _preferenceLists.put(partitionId, preferenceList);
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public SemiAutoRebalancerContext build() {
+ SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
+ super.update(context);
+ context.setPreferenceLists(_preferenceLists);
+ return context;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index ce52a19..a0ee1c1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -30,89 +30,136 @@ import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDefinedState;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.<br/>
- * <br/>
- * Deprecated. Use {@link org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment} instead.
+ * Collection of functions that will compute the best possible state based on the participants and
+ * the rebalancer configuration of a resource.
*/
-@Deprecated
public class ConstraintBasedAssignment {
private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
- public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
- IdealState idealState, StateModelDefinition stateModelDef) {
- List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+ /**
+ * Get a set of disabled participants for a partition
+ * @param participantMap map of all participants
+ * @param partitionId the partition to check
+ * @return a set of all participants that are disabled for the partition
+ */
+ public static Set<ParticipantId> getDisabledParticipants(
+ final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+ Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
+ Set<ParticipantId> disabledParticipantsForPartition =
+ Sets.filter(participantSet, new Predicate<ParticipantId>() {
+ @Override
+ public boolean apply(ParticipantId participantId) {
+ Participant participant = participantMap.get(participantId);
+ return !participant.isEnabled()
+ || participant.getDisabledPartitionIds().contains(partitionId);
+ }
+ });
+ return disabledParticipantsForPartition;
+ }
- if (listField != null && listField.size() == 1
- && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+ /**
+ * Get an ordered list of participants that can serve a partition
+ * @param cluster cluster snapshot
+ * @param partitionId the partition to look up
+ * @param config rebalancing constraints
+ * @return list with most preferred participants first
+ */
+ public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
+ List<ParticipantId> prefList) {
+ if (prefList != null && prefList.size() == 1
+ && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
+ prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
Collections.sort(prefList);
- return prefList;
- } else {
- return listField;
}
+ return prefList;
}
/**
- * compute best state for resource in AUTO ideal state mode
- * @param cache
+ * Get a map of state to upper bound constraint given a cluster
+ * @param stateModelDef the state model definition to check
+ * @param resourceId the resource that is constraint
+ * @param cluster the cluster the resource belongs to
+ * @return map of state to upper bound
+ */
+ public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
+ ResourceId resourceId, ClusterConfig cluster) {
+ Map<State, String> stateMap = Maps.newHashMap();
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
+ String num =
+ cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+ stateModelDef.getStateModelDefId(), state);
+ stateMap.put(state, num);
+ }
+ return stateMap;
+ }
+
+ /**
+ * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
+ * @param upperBounds map of state to upper bound
+ * @param liveParticipantSet set of live participant ids
* @param stateModelDef
- * @param instancePreferenceList
+ * @param participantPreferenceList
* @param currentStateMap
- * : instance->state for each partition
- * @param disabledInstancesForPartition
+ * : participant->state for each partition
+ * @param disabledParticipantsForPartition
* @return
*/
- public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef, List<String> instancePreferenceList,
- Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
+ public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+ Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
+ StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+ Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+ Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
- // if the ideal state is deleted, instancePreferenceList will be empty and
+ // if the resource is deleted, instancePreferenceList will be empty and
// we should drop all resources.
if (currentStateMap != null) {
- for (String instance : currentStateMap.keySet()) {
- if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !disabledInstancesForPartition.contains(instance)) {
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ if ((participantPreferenceList == null || !participantPreferenceList
+ .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
// if dropped and not disabled, transit to DROPPED
- instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.toString()))
- && disabledInstancesForPartition.contains(instance)) {
+ participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+ } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+ participantId).equals(State.from(HelixDefinedState.ERROR)))
+ && disabledParticipantsForPartition.contains(participantId)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
+ participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
}
}
- // ideal state is deleted
- if (instancePreferenceList == null) {
- return instanceStateMap;
+ // resource is deleted
+ if (participantPreferenceList == null) {
+ return participantStateMap;
}
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
- boolean assigned[] = new boolean[instancePreferenceList.size()];
-
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+ boolean assigned[] = new boolean[participantPreferenceList.size()];
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
+ for (State state : statesPriorityList) {
+ String num = upperBounds.get(state);
int stateCount = -1;
if ("N".equals(num)) {
- Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
- liveAndEnabled.removeAll(disabledInstancesForPartition);
+ Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
+ liveAndEnabled.removeAll(disabledParticipantsForPartition);
stateCount = liveAndEnabled.size();
} else if ("R".equals(num)) {
- stateCount = instancePreferenceList.size();
+ stateCount = participantPreferenceList.size();
} else {
try {
stateCount = Integer.parseInt(num);
@@ -122,16 +169,18 @@ public class ConstraintBasedAssignment {
}
if (stateCount > -1) {
int count = 0;
- for (int i = 0; i < instancePreferenceList.size(); i++) {
- String instanceName = instancePreferenceList.get(i);
+ for (int i = 0; i < participantPreferenceList.size(); i++) {
+ ParticipantId participantId = participantPreferenceList.get(i);
boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(instanceName) == null
- || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
-
- if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
- && !disabledInstancesForPartition.contains(instanceName)) {
- instanceStateMap.put(instanceName, state);
+ currentStateMap == null
+ || currentStateMap.get(participantId) == null
+ || !currentStateMap.get(participantId)
+ .equals(State.from(HelixDefinedState.ERROR));
+
+ if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
+ && !disabledParticipantsForPartition.contains(participantId)) {
+ participantStateMap.put(participantId, state);
count = count + 1;
assigned[i] = true;
if (count == stateCount) {
@@ -141,24 +190,25 @@ public class ConstraintBasedAssignment {
}
}
}
- return instanceStateMap;
+ return participantStateMap;
}
/**
* Get the number of replicas that should be in each state for a partition
+ * @param upperBounds map of state to upper bound
* @param stateModelDef StateModelDefinition object
* @param liveNodesNb number of live nodes
* @param total number of replicas
* @return state count map: state->count
*/
- public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
- int liveNodesNb, int totalReplicas) {
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+ public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
+ StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
+ LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
int replicas = totalReplicas;
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
+ for (State state : statesPriorityList) {
+ String num = upperBounds.get(state);
if ("N".equals(num)) {
stateCountMap.put(state, liveNodesNb);
} else if ("R".equals(num)) {
@@ -181,8 +231,8 @@ public class ConstraintBasedAssignment {
}
// get state count for R
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
+ for (State state : statesPriorityList) {
+ String num = upperBounds.get(state);
if ("R".equals(num)) {
stateCountMap.put(state, replicas);
// should have at most one state using R
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 51301f0..aae33b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -30,12 +30,12 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.Rebalancer;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 5ecbddf..1d287fa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -42,10 +42,10 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 5d9746b..d6fe8c3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -37,9 +37,9 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 15004a6..5c24314 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -36,11 +36,11 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.ReplicatedRebalancerContext;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index 949cfca..a9ab34e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.deprecated.controller.rebalancer.Rebalancer;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Resource;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index dc56b89..1fdd892 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -30,11 +30,11 @@ import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.CurrentState;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
new file mode 100644
index 0000000..afe132e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
@@ -0,0 +1,187 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.deprecated.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
+ * state of a resource, fully adapting to the addition or removal of instances. This includes
+ * computation of a new preference list and a partition to instance and state mapping based on the
+ * computed instance preferences.
+ * The input is the current assignment of partitions to instances, as well as existing instance
+ * preferences, if any.
+ * The output is a preference list and a mapping based on that preference list, i.e. partition p
+ * has a replica on node k with state s.
+ */
+@Deprecated
+public class AutoRebalancer implements Rebalancer {
+ // These should be final, but are initialized in init rather than a constructor
+ private HelixManager _manager;
+ private AutoRebalanceStrategy _algorithm;
+
+ private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
+
+ @Override
+ public void init(HelixManager manager) {
+ this._manager = manager;
+ this._algorithm = null;
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ // Compute a preference list based on the current ideal state
+ List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
+ String stateModelName = currentIdealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+ Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
+ String replicas = currentIdealState.getReplicas();
+
+ LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+ stateCountMap =
+ ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
+ Integer.parseInt(replicas));
+ List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
+ Map<String, Map<String, String>> currentMapping =
+ currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
+
+ // If there are nodes tagged with resource name, use only those nodes
+ Set<String> taggedNodes = new HashSet<String>();
+ if (currentIdealState.getInstanceGroupTag() != null) {
+ for (String instanceName : liveNodes) {
+ if (clusterData.getInstanceConfigMap().get(instanceName)
+ .containsTag(currentIdealState.getInstanceGroupTag())) {
+ taggedNodes.add(instanceName);
+ }
+ }
+ }
+ if (taggedNodes.size() > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
+ + " " + taggedNodes);
+ }
+ liveNodes = new ArrayList<String>(taggedNodes);
+ }
+
+ List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+ int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("currentMapping: " + currentMapping);
+ LOG.info("stateCountMap: " + stateCountMap);
+ LOG.info("liveNodes: " + liveNodes);
+ LOG.info("allNodes: " + allNodes);
+ LOG.info("maxPartition: " + maxPartition);
+ }
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ placementScheme.init(_manager);
+ _algorithm =
+ new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap,
+ maxPartition, placementScheme);
+ ZNRecord newMapping =
+ _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("newMapping: " + newMapping);
+ }
+
+ IdealState newIdealState = new IdealState(resource.getResourceName());
+ newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+ newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
+ newIdealState.getRecord().setListFields(newMapping.getListFields());
+
+ // compute a full partition mapping for the resource
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resource.getResourceName());
+ }
+ ResourceAssignment partitionMapping =
+ new ResourceAssignment(ResourceId.from(resource.getResourceName()));
+ for (String partitionName : partitions) {
+ Partition partition = new Partition(partitionName);
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+ Set<String> disabledInstancesForPartition =
+ clusterData.getDisabledInstancesForPartition(partition.toString());
+ List<String> preferenceList =
+ ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState,
+ stateModelDef);
+ Map<String, String> bestStateForPartition =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
+ preferenceList, currentStateMap, disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(PartitionId.from(partitionName),
+ ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+ }
+ return partitionMapping;
+ }
+
+ private Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
+ String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
+
+ Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
+
+ for (String partition : partitions) {
+ Map<String, String> curStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
+ map.put(partition, new HashMap<String, String>());
+ for (String node : curStateMap.keySet()) {
+ String state = curStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition).put(node, state);
+ }
+ }
+
+ Map<String, String> pendingStateMap =
+ currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
+ for (String node : pendingStateMap.keySet()) {
+ String state = pendingStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition).put(node, state);
+ }
+ }
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
new file mode 100644
index 0000000..e1136a2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
@@ -0,0 +1,135 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
+ * partitions against the set of live instances to mark assignment states as dropped or erroneous
+ * as necessary.
+ * The input is the required current assignment of partitions to instances, as well as the required
+ * existing instance preferences.
+ * The output is a verified mapping based on that preference list, i.e. partition p has a replica
+ * on node k with state s, where s may be a dropped or error state if necessary.
+ */
+@Deprecated
+public class CustomRebalancer implements Rebalancer {
+
+ private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
+
+ @Override
+ public void init(HelixManager manager) {
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ String stateModelDefName = currentIdealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resource.getResourceName());
+ }
+ ResourceAssignment partitionMapping =
+ new ResourceAssignment(ResourceId.from(resource.getResourceName()));
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+ Set<String> disabledInstancesForPartition =
+ clusterData.getDisabledInstancesForPartition(partition.toString());
+ Map<String, String> idealStateMap =
+ IdealState.stringMapFromParticipantStateMap(currentIdealState
+ .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
+ Map<String, String> bestStateForPartition =
+ computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
+ currentStateMap, disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
+ ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+ }
+ return partitionMapping;
+ }
+
+ /**
+ * compute best state for resource in CUSTOMIZED ideal state mode
+ * @param cache
+ * @param stateModelDef
+ * @param idealStateMap
+ * @param currentStateMap
+ * @param disabledInstancesForPartition
+ * @return
+ */
+ private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
+ StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
+ Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+ Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+ // if the ideal state is deleted, idealStateMap will be null/empty and
+ // we should drop all resources.
+ if (currentStateMap != null) {
+ for (String instance : currentStateMap.keySet()) {
+ if ((idealStateMap == null || !idealStateMap.containsKey(instance))
+ && !disabledInstancesForPartition.contains(instance)) {
+ // if dropped and not disabled, transit to DROPPED
+ instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+ } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
+ HelixDefinedState.ERROR.toString()))
+ && disabledInstancesForPartition.contains(instance)) {
+ // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
+ }
+ }
+
+ // ideal state is deleted
+ if (idealStateMap == null) {
+ return instanceStateMap;
+ }
+
+ Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+ for (String instance : idealStateMap.keySet()) {
+ boolean notInErrorState =
+ currentStateMap == null || currentStateMap.get(instance) == null
+ || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+
+ if (liveInstancesMap.containsKey(instance) && notInErrorState
+ && !disabledInstancesForPartition.contains(instance)) {
+ instanceStateMap.put(instance, idealStateMap.get(instance));
+ }
+ }
+
+ return instanceStateMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
new file mode 100644
index 0000000..59b827d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
@@ -0,0 +1,58 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+
+/**
+ * Allows one to come up with custom implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the newIdealState for a resource in this method.<br/>
+ * <br/>
+ * Deprecated. Use {@link org.apache.helix.controller.rebalancer.Rebalancer} instead.
+ */
+@Deprecated
+public interface Rebalancer {
+ /**
+ * Initialize the rebalancer with a HelixManager if necessary
+ * @param manager
+ */
+ void init(HelixManager manager);
+
+ /**
+ * Given an ideal state for a resource and liveness of instances, compute a assignment of
+ * instances and states to each partition of a resource. This method provides all the relevant
+ * information needed to rebalance a resource. If you need additional information use
+ * manager.getAccessor to read the cluster data. This allows one to compute the newIdealState
+ * according to app specific requirements.
+ * @param resourceName the resource for which a mapping will be computed
+ * @param currentIdealState the IdealState that corresponds to this resource
+ * @param currentStateOutput the current states of all partitions
+ * @param clusterData cache of the cluster state
+ */
+ ResourceAssignment computeResourceMapping(final Resource resource,
+ final IdealState currentIdealState, final CurrentStateOutput currentStateOutput,
+ final ClusterDataCache clusterData);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
new file mode 100644
index 0000000..b4259f9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
@@ -0,0 +1,83 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.deprecated.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
+ * state of a resource based on a predefined preference list of instances willing to accept
+ * replicas.
+ * The input is the optional current assignment of partitions to instances, as well as the required
+ * existing instance preferences.
+ * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
+ * with state s.
+ */
+@Deprecated
+public class SemiAutoRebalancer implements Rebalancer {
+
+ private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
+
+ @Override
+ public void init(HelixManager manager) {
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ String stateModelDefName = currentIdealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resource.getResourceName());
+ }
+ ResourceAssignment partitionMapping =
+ new ResourceAssignment(ResourceId.from(resource.getResourceName()));
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+ Set<String> disabledInstancesForPartition =
+ clusterData.getDisabledInstancesForPartition(partition.toString());
+ List<String> preferenceList =
+ ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState,
+ stateModelDef);
+ Map<String, String> bestStateForPartition =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
+ preferenceList, currentStateMap, disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
+ ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+ }
+ return partitionMapping;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
new file mode 100644
index 0000000..c75423f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -0,0 +1,194 @@
+package org.apache.helix.deprecated.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Collection of functions that will compute the best possible states given the live instances and
+ * an ideal state.<br/>
+ * <br/>
+ * Deprecated. Use {@link org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment} instead.
+ */
+@Deprecated
+public class ConstraintBasedAssignment {
+ private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
+
+ public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
+ IdealState idealState, StateModelDefinition stateModelDef) {
+ List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+
+ if (listField != null && listField.size() == 1
+ && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+ Collections.sort(prefList);
+ return prefList;
+ } else {
+ return listField;
+ }
+ }
+
+ /**
+ * compute best state for resource in AUTO ideal state mode
+ * @param cache
+ * @param stateModelDef
+ * @param instancePreferenceList
+ * @param currentStateMap
+ * : instance->state for each partition
+ * @param disabledInstancesForPartition
+ * @return
+ */
+ public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
+ StateModelDefinition stateModelDef, List<String> instancePreferenceList,
+ Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+ Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+ // if the ideal state is deleted, instancePreferenceList will be empty and
+ // we should drop all resources.
+ if (currentStateMap != null) {
+ for (String instance : currentStateMap.keySet()) {
+ if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
+ && !disabledInstancesForPartition.contains(instance)) {
+ // if dropped and not disabled, transit to DROPPED
+ instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+ } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
+ HelixDefinedState.ERROR.toString()))
+ && disabledInstancesForPartition.contains(instance)) {
+ // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
+ }
+ }
+
+ // ideal state is deleted
+ if (instancePreferenceList == null) {
+ return instanceStateMap;
+ }
+
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+ boolean assigned[] = new boolean[instancePreferenceList.size()];
+
+ Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+
+ for (String state : statesPriorityList) {
+ String num = stateModelDef.getNumInstancesPerState(state);
+ int stateCount = -1;
+ if ("N".equals(num)) {
+ Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
+ liveAndEnabled.removeAll(disabledInstancesForPartition);
+ stateCount = liveAndEnabled.size();
+ } else if ("R".equals(num)) {
+ stateCount = instancePreferenceList.size();
+ } else {
+ try {
+ stateCount = Integer.parseInt(num);
+ } catch (Exception e) {
+ logger.error("Invalid count for state:" + state + " ,count=" + num);
+ }
+ }
+ if (stateCount > -1) {
+ int count = 0;
+ for (int i = 0; i < instancePreferenceList.size(); i++) {
+ String instanceName = instancePreferenceList.get(i);
+
+ boolean notInErrorState =
+ currentStateMap == null || currentStateMap.get(instanceName) == null
+ || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
+
+ if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
+ && !disabledInstancesForPartition.contains(instanceName)) {
+ instanceStateMap.put(instanceName, state);
+ count = count + 1;
+ assigned[i] = true;
+ if (count == stateCount) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ return instanceStateMap;
+ }
+
+ /**
+ * Get the number of replicas that should be in each state for a partition
+ * @param stateModelDef StateModelDefinition object
+ * @param liveNodesNb number of live nodes
+ * @param total number of replicas
+ * @return state count map: state->count
+ */
+ public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
+ int liveNodesNb, int totalReplicas) {
+ LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+ int replicas = totalReplicas;
+ for (String state : statesPriorityList) {
+ String num = stateModelDef.getNumInstancesPerState(state);
+ if ("N".equals(num)) {
+ stateCountMap.put(state, liveNodesNb);
+ } else if ("R".equals(num)) {
+ // wait until we get the counts for all other states
+ continue;
+ } else {
+ int stateCount = -1;
+ try {
+ stateCount = Integer.parseInt(num);
+ } catch (Exception e) {
+ // LOG.error("Invalid count for state: " + state + ", count: " + num +
+ // ", use -1 instead");
+ }
+
+ if (stateCount > 0) {
+ stateCountMap.put(state, stateCount);
+ replicas -= stateCount;
+ }
+ }
+ }
+
+ // get state count for R
+ for (String state : statesPriorityList) {
+ String num = stateModelDef.getNumInstancesPerState(state);
+ if ("R".equals(num)) {
+ stateCountMap.put(state, replicas);
+ // should have at most one state using R
+ break;
+ }
+ }
+ return stateCountMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 805f6bf..df4043a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -38,7 +38,7 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.api.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.log4j.Logger;
import com.google.common.base.Function;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index f48ebbc..85330be 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -45,10 +45,10 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.CustomRebalancerContext;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;