You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:29 UTC
[14/52] [abbrv] git commit: [HELIX-297] Support old rebalancer for
backward compatibility, rb=15431
[HELIX-297] Support old rebalancer for backward compatibility, rb=15431
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/55c93516
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/55c93516
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/55c93516
Branch: refs/heads/helix-yarn
Commit: 55c935169eeb1e86825a533f4f5c101083d88364
Parents: ec36112
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Nov 11 14:24:05 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Nov 11 14:24:05 2013 -0800
----------------------------------------------------------------------
.../controller/rebalancer/CustomRebalancer.java | 62 +------
.../rebalancer/FallbackRebalancer.java | 185 +++++++++++++++++++
.../util/ConstraintBasedAssignment.java | 50 +++++
.../stages/BestPossibleStateCalcStage.java | 16 +-
.../controller/stages/ResourceCurrentState.java | 11 ++
.../java/org/apache/helix/model/Partition.java | 2 +
.../java/org/apache/helix/model/Resource.java | 4 +-
.../TestUserDefRebalancerCompatibility.java | 104 +++++++++++
8 files changed, 366 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 69c379a..5209e2c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -1,10 +1,8 @@
package org.apache.helix.controller.rebalancer;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
@@ -61,65 +59,13 @@ public class CustomRebalancer implements HelixRebalancer {
Map<ParticipantId, State> currentStateMap =
currentState.getCurrentStateMap(config.getResourceId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
- ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition);
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
Map<ParticipantId, State> bestStateForPartition =
- computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
- stateModelDef, config.getPreferenceMap(partition), currentStateMap,
- disabledInstancesForPartition);
+ ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
+ .getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
+ currentStateMap, disabledInstancesForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
-
- /**
- * compute best state for resource in CUSTOMIZED rebalancer mode
- * @param liveParticipantMap
- * @param stateModelDef
- * @param preferenceMap
- * @param currentStateMap
- * @param disabledParticipantsForPartition
- * @return
- */
- private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
- Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
- Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
- Set<ParticipantId> disabledParticipantsForPartition) {
- Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
- // if the resource is deleted, idealStateMap will be null/empty and
- // we should drop all resources.
- if (currentStateMap != null) {
- for (ParticipantId participantId : currentStateMap.keySet()) {
- if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
- && !disabledParticipantsForPartition.contains(participantId)) {
- // if dropped and not disabled, transit to DROPPED
- participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
- } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
- participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
- // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
- }
- }
- }
-
- // ideal state is deleted
- if (preferenceMap == null) {
- return participantStateMap;
- }
-
- for (ParticipantId participantId : preferenceMap.keySet()) {
- boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(participantId) == null
- || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
-
- if (liveParticipantSet.contains(participantId) && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
- participantStateMap.put(participantId, preferenceMap.get(participantId));
- }
- }
-
- return participantStateMap;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
new file mode 100644
index 0000000..fc4bfa0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -0,0 +1,185 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is intented for use to wrap usages of {@link Rebalancer}. It is subject to removal
+ * once that class is removed.
+ */
+@SuppressWarnings("deprecation")
+public class FallbackRebalancer implements HelixRebalancer {
+ private static final Logger LOG = Logger.getLogger(FallbackRebalancer.class);
+ private HelixManager _helixManager;
+
+ @Override
+ public void init(HelixManager helixManager) {
+ _helixManager = helixManager;
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
+ // make sure the manager is not null
+ if (_helixManager == null) {
+ LOG.info("HelixManager is null!");
+ return null;
+ }
+
+ // get the context
+ PartitionedRebalancerContext context =
+ rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+ if (context == null) {
+ LOG.info("Resource is not partitioned");
+ return null;
+ }
+
+ // get the ideal state and rebalancer class
+ ResourceId resourceId = context.getResourceId();
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(context.getStateModelDefId());
+ if (stateModelDef == null) {
+ LOG.info("StateModelDefinition unavailable for " + resourceId);
+ return null;
+ }
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceId.stringify()));
+ if (idealState == null) {
+ LOG.info("No IdealState available for " + resourceId);
+ return null;
+ }
+ String rebalancerClassName = idealState.getRebalancerClassName();
+ if (rebalancerClassName == null) {
+ LOG.info("No Rebalancer class available for " + resourceId);
+ return null;
+ }
+
+ // try to instantiate the rebalancer class
+ Rebalancer rebalancer = null;
+ try {
+ rebalancer =
+ (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+ } catch (Exception e) {
+ LOG.warn("rebalancer " + rebalancerClassName + " not available", e);
+ }
+ if (rebalancer == null) {
+ LOG.warn("Rebalancer class " + rebalancerClassName + " could not be instantiated for "
+ + resourceId);
+ return null;
+ }
+
+ // get the cluster data cache (unfortunately involves a second read of the cluster)
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.refresh(accessor);
+
+ // adapt ResourceCurrentState to CurrentStateOutput
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ for (ResourceId resource : currentState.getResourceIds()) {
+ currentStateOutput.setBucketSize(resource.stringify(), currentState.getBucketSize(resource));
+ currentStateOutput.setResourceStateModelDef(resource.stringify(), currentState
+ .getResourceStateModelDef(resource).stringify());
+ Set<PartitionId> partitions = currentState.getCurrentStateMappedPartitions(resource);
+ for (PartitionId partitionId : partitions) {
+ // set current state
+ Map<ParticipantId, State> currentStateMap =
+ currentState.getCurrentStateMap(resource, partitionId);
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ currentStateOutput.setCurrentState(resource.stringify(),
+ new Partition(partitionId.stringify()), participantId.stringify(), currentStateMap
+ .get(participantId).toString());
+ }
+
+ // set pending current state
+ Map<ParticipantId, State> pendingStateMap =
+ currentState.getPendingStateMap(resource, partitionId);
+ for (ParticipantId participantId : pendingStateMap.keySet()) {
+ currentStateOutput.setPendingState(resource.stringify(),
+ new Partition(partitionId.stringify()), participantId.stringify(), pendingStateMap
+ .get(participantId).toString());
+ }
+ }
+ }
+
+ // call the rebalancer
+ rebalancer.init(_helixManager);
+ IdealState newIdealState =
+ rebalancer.computeResourceMapping(resourceId.stringify(), idealState, currentStateOutput,
+ cache);
+
+ // do the resource assignments
+ ResourceAssignment assignment = new ResourceAssignment(resourceId);
+ if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ // customized ideal state uses a map
+ for (PartitionId partitionId : newIdealState.getPartitionIdSet()) {
+ Set<ParticipantId> disabledParticipants =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partitionId);
+ Map<ParticipantId, State> replicaMap =
+ ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
+ .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
+ .getParticipantStateMap(partitionId), currentState.getCurrentStateMap(resourceId,
+ partitionId), disabledParticipants);
+ assignment.addReplicaMap(partitionId, replicaMap);
+ }
+ } else {
+ // other modes use auto assignment
+ Map<State, String> upperBounds =
+ ConstraintBasedAssignment
+ .stateConstraints(stateModelDef, resourceId, cluster.getConfig());
+ for (PartitionId partitionId : newIdealState.getPartitionIdSet()) {
+ Set<ParticipantId> disabledParticipants =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partitionId);
+ Map<ParticipantId, State> replicaMap =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+ .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
+ .getPreferenceList(partitionId), currentState.getCurrentStateMap(resourceId,
+ partitionId), disabledParticipants);
+ assignment.addReplicaMap(partitionId, replicaMap);
+ }
+ }
+ return assignment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 84129de..7951784 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -260,4 +260,54 @@ public class ConstraintBasedAssignment {
}
return stateCountMap;
}
+
+ /**
+ * compute best state for resource in CUSTOMIZED rebalancer mode
+ * @param liveParticipantMap
+ * @param stateModelDef
+ * @param preferenceMap
+ * @param currentStateMap
+ * @param disabledParticipantsForPartition
+ * @return
+ */
+ public static Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+ Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+ Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
+ Set<ParticipantId> disabledParticipantsForPartition) {
+ Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+ // if the resource is deleted, idealStateMap will be null/empty and
+ // we should drop all resources.
+ if (currentStateMap != null) {
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
+ && !disabledParticipantsForPartition.contains(participantId)) {
+ // if dropped and not disabled, transit to DROPPED
+ participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+ } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+ participantId).equals(State.from(HelixDefinedState.ERROR)))
+ && disabledParticipantsForPartition.contains(participantId)) {
+ // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+ participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
+ }
+ }
+ }
+
+ // ideal state is deleted
+ if (preferenceMap == null) {
+ return participantStateMap;
+ }
+
+ for (ParticipantId participantId : preferenceMap.keySet()) {
+ boolean notInErrorState =
+ currentStateMap == null || currentStateMap.get(participantId) == null
+ || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
+
+ if (liveParticipantSet.contains(participantId) && notInErrorState
+ && !disabledParticipantsForPartition.contains(participantId)) {
+ participantStateMap.put(participantId, preferenceMap.get(participantId));
+ }
+ }
+ return participantStateMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 96b1ac8..7b143bd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -33,6 +33,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.FallbackRebalancer;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.rebalancer.context.RebalancerContext;
@@ -172,18 +173,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
ResourceConfig resourceConfig = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+ RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
ResourceAssignment resourceAssignment = null;
if (rebalancerConfig != null) {
HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
- if (rebalancer != null) {
- HelixManager manager = event.getAttribute("helixmanager");
- rebalancer.init(manager);
- resourceAssignment =
- rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
+ HelixManager manager = event.getAttribute("helixmanager");
+ if (rebalancer == null) {
+ rebalancer = new FallbackRebalancer();
}
+ rebalancer.init(manager);
+ resourceAssignment =
+ rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
}
- RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
- StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
if (resourceAssignment == null) {
resourceAssignment =
mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index 2f5ec1d..f04afd0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -68,6 +68,17 @@ public class ResourceCurrentState {
}
/**
+ * Get all the resources seen in the aggregated current state
+ * @return set of ResourceId
+ */
+ public Set<ResourceId> getResourceIds() {
+ Set<ResourceId> allResources = Sets.newHashSet();
+ allResources.addAll(_currentStateMap.keySet());
+ allResources.addAll(_pendingStateMap.keySet());
+ return allResources;
+ }
+
+ /**
* @param resourceId
* @param stateModelDefId
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/model/Partition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Partition.java b/helix-core/src/main/java/org/apache/helix/model/Partition.java
index 1d694ab..6a3e054 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Partition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Partition.java
@@ -21,7 +21,9 @@ package org.apache.helix.model;
/**
* A distinct partition of a resource
+ * Deprecated. Use {@link org.apache.helix.api.Partition}
*/
+@Deprecated
public class Partition {
private final String _partitionName;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/main/java/org/apache/helix/model/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java
index 1544514..437aca6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java
@@ -24,14 +24,12 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.helix.HelixConstants;
-import org.apache.log4j.Logger;
/**
* A resource contains a set of partitions and its replicas are managed by a state model
*/
+@Deprecated
public class Resource {
- private static Logger LOG = Logger.getLogger(Resource.class);
-
private final String _resourceName;
private final Map<String, Partition> _partitionMap;
private String _stateModelDefRef;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/55c93516/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
new file mode 100644
index 0000000..27004fe
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -0,0 +1,104 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("deprecation")
+public class TestUserDefRebalancerCompatibility extends
+ ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ String db2 = TEST_DB + "2";
+ static boolean testRebalancerCreated = false;
+ static boolean testRebalancerInvoked = false;
+
+ public static class TestRebalancer implements Rebalancer {
+ @Override
+ public void init(HelixManager helixManager) {
+ testRebalancerCreated = true;
+ }
+
+ /**
+ * Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
+ * which is in the highest-priority state.
+ */
+ @Override
+ public IdealState computeResourceMapping(String resourceName, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ testRebalancerInvoked = true;
+ for (String partition : currentIdealState.getPartitionSet()) {
+ String instance = currentIdealState.getPreferenceList(partition).get(0);
+ currentIdealState.getPreferenceList(partition).clear();
+ currentIdealState.getPreferenceList(partition).add(instance);
+
+ currentIdealState.getInstanceStateMap(partition).clear();
+ currentIdealState.getInstanceStateMap(partition).put(instance, "MASTER");
+ }
+ currentIdealState.setReplicas("1");
+ return currentIdealState;
+ }
+ }
+
+ @Test
+ public void testCustomizedIdealStateRebalancer() throws InterruptedException {
+ _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "MasterSlave");
+ _setupTool.addResourceProperty(CLUSTER_NAME, db2,
+ IdealStateProperty.REBALANCER_CLASS_NAME.toString(),
+ TestUserDefRebalancerCompatibility.TestRebalancer.class.getName());
+
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new TestCustomizedIdealStateRebalancer.ExternalViewBalancedVerifier(
+ _gZkClient, CLUSTER_NAME, db2));
+ Assert.assertTrue(result);
+ Thread.sleep(1000);
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
+ Assert.assertEquals(ev.getPartitionSet().size(), 60);
+ for (String partition : ev.getPartitionSet()) {
+ Assert.assertEquals(ev.getStateMap(partition).size(), 1);
+ }
+ IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
+ for (PartitionId partition : is.getPartitionIdSet()) {
+ Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
+ Assert.assertEquals(is.getParticipantStateMap(partition).size(), 3);
+ }
+ Assert.assertTrue(testRebalancerCreated);
+ Assert.assertTrue(testRebalancerInvoked);
+ }
+}