You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/07 07:40:39 UTC
[2/3] [HELIX-109] adding config classes
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index af23eb2..a6d9db4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
@@ -28,6 +30,7 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -51,16 +54,17 @@ public class NewResourceComputationStage extends AbstractBaseStage {
throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
}
- Map<ResourceId, Resource.Builder> resourceBuilderMap =
- new LinkedHashMap<ResourceId, Resource.Builder>();
+ Map<ResourceId, ResourceConfig.Builder> resourceBuilderMap =
+ new LinkedHashMap<ResourceId, ResourceConfig.Builder>();
// include all resources in ideal-state
for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
Resource resource = cluster.getResource(resourceId);
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
- Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+ ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfig);
- resourceBuilder.addPartitions(resource.getPartitionSet());
+ Set<Partition> partitionSet = new HashSet<Partition>(resource.getPartitionMap().values());
+ resourceBuilder.addPartitions(partitionSet);
resourceBuilderMap.put(resourceId, resourceBuilder);
}
@@ -87,7 +91,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
- Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+ ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
resourceBuilderMap.put(resourceId, resourceBuilder);
}
@@ -99,7 +103,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
}
// convert builder-map to resource-map
- Map<ResourceId, Resource> resourceMap = new LinkedHashMap<ResourceId, Resource>();
+ Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId, ResourceConfig>();
for (ResourceId resourceId : resourceBuilderMap.keySet()) {
resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 2b8a0c8..f5bb47f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -35,13 +35,11 @@ import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
public class NewTaskAssignmentStage extends AbstractBaseStage {
@@ -53,9 +51,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
logger.info("START TaskAssignmentStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- NewMessageOutput messageOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
@@ -68,7 +66,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
List<Message> messagesToSend = new ArrayList<Message>();
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
messagesToSend.addAll(messages);
@@ -86,8 +84,8 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
}
List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
- Map<ResourceId, Resource> resourceMap, Map<ParticipantId, Participant> liveParticipantMap,
- HelixManagerProperties properties) {
+ Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
// group messages by its CurrentState path + "/" + fromState + "/" + toState
Map<String, Message> batchMessages = new HashMap<String, Message>();
List<Message> outputMessages = new ArrayList<Message>();
@@ -96,7 +94,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
while (iter.hasNext()) {
Message message = iter.next();
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
ParticipantId participantId = Id.participant(message.getTgtName());
Participant liveParticipant = liveParticipantMap.get(participantId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index 3b46c13..ef47a12 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -76,6 +76,14 @@ public class ClusterConstraints extends HelixProperty {
}
/**
+ * Get the type of constraint this object represents
+ * @return constraint type
+ */
+ public ConstraintType getType() {
+ return ConstraintType.valueOf(getId());
+ }
+
+ /**
* Instantiate constraints from a pre-populated ZNRecord
* @param record ZNRecord containing all constraints
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 24ec7c9..16b3fa1 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -339,10 +339,13 @@ public class IdealState extends HelixProperty {
Map<String, String> instanceStateMap = getInstanceStateMap(partitionId.stringify());
ImmutableMap.Builder<ParticipantId, State> builder =
new ImmutableMap.Builder<ParticipantId, State>();
- for (String participantId : instanceStateMap.keySet()) {
- builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+ if (instanceStateMap != null) {
+ for (String participantId : instanceStateMap.keySet()) {
+ builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+ }
+ return builder.build();
}
- return builder.build();
+ return null;
}
/**
@@ -433,10 +436,13 @@ public class IdealState extends HelixProperty {
public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
ImmutableList.Builder<ParticipantId> builder = new ImmutableList.Builder<ParticipantId>();
List<String> preferenceStringList = getPreferenceList(partitionId.stringify());
- for (String participantName : preferenceStringList) {
- builder.add(Id.participant(participantName));
+ if (preferenceStringList != null) {
+ for (String participantName : preferenceStringList) {
+ builder.add(Id.participant(participantName));
+ }
+ return builder.build();
}
- return builder.build();
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 8577578..2b06c2b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -84,6 +84,14 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Get the entire map of a resource
+ * @return map of partition to participant to state
+ */
+ public Map<PartitionId, Map<ParticipantId, State>> getResourceMap() {
+ return replicaMapsFromStringMaps(_record.getMapFields());
+ }
+
+ /**
* Get the participant, state pairs for a partition
* @param partition the Partition to look up
* @return map of (participant id, state)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 7ceee85..b371c6a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -45,6 +45,12 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
@@ -53,6 +59,10 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewResourceComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,6 +70,8 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.PropertyJsonComparator;
@@ -156,7 +168,7 @@ public class ClusterStateVerifier {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
- return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+ return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName);
} catch (Exception e) {
LOG.error("exception in verification", e);
}
@@ -222,10 +234,11 @@ public class ClusterStateVerifier {
}
static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
- Map<String, Map<String, String>> errStates) {
+ Map<String, Map<String, String>> errStates, String clusterName) {
try {
Builder keyBuilder = accessor.keyBuilder();
// read cluster once and do verification
+ // TODO: stop using ClusterDataCache
ClusterDataCache cache = new ClusterDataCache();
cache.refresh(accessor);
@@ -250,10 +263,31 @@ public class ClusterStateVerifier {
}
}
+ Map<String, StateModelDefinition> stateModelDefs =
+ accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+ Map<StateModelDefId, StateModelDefinition> convertedDefs =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+ for (String defName : stateModelDefs.keySet()) {
+ convertedDefs.put(Id.stateModelDef(defName), stateModelDefs.get(defName));
+ }
+ ClusterAccessor clusterAccessor = new ClusterAccessor(Id.cluster(clusterName), accessor);
+ Cluster cluster = clusterAccessor.readCluster();
// calculate best possible state
- BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cache);
- Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
- bestPossOutput.getStateMap();
+ NewBestPossibleStateOutput bestPossOutput =
+ ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
+ Map<String, Map<String, Map<String, String>>> bestPossStateMap =
+ new HashMap<String, Map<String, Map<String, String>>>();
+ for (ResourceId resourceId : bestPossOutput.getAssignedResources()) {
+ ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
+ Map<String, Map<String, String>> resourceMap = new HashMap<String, Map<String, String>>();
+ for (PartitionId partitionId : resourceAssignment.getMappedPartitions()) {
+ Map<String, String> replicaMap =
+ ResourceAssignment.stringMapFromReplicaMap(resourceAssignment
+ .getReplicaMap(partitionId));
+ resourceMap.put(partitionId.stringify(), replicaMap);
+ }
+ bestPossStateMap.put(resourceId.stringify(), resourceMap);
+ }
// set error states
if (errStates != null) {
@@ -263,13 +297,12 @@ public class ClusterStateVerifier {
String instanceName = partErrStates.get(partitionName);
if (!bestPossStateMap.containsKey(resourceName)) {
- bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ bestPossStateMap.put(resourceName, new HashMap<String, Map<String, String>>());
}
- Partition partition = new Partition(partitionName);
- if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
- bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+ if (!bestPossStateMap.get(resourceName).containsKey(partitionName)) {
+ bestPossStateMap.get(resourceName).put(partitionName, new HashMap<String, String>());
}
- bestPossStateMap.get(resourceName).get(partition)
+ bestPossStateMap.get(resourceName).get(partitionName)
.put(instanceName, HelixDefinedState.ERROR.toString());
}
}
@@ -285,11 +318,12 @@ public class ClusterStateVerifier {
}
// step 0: remove empty map and DROPPED state from best possible state
- Map<Partition, Map<String, String>> bpStateMap =
- bestPossOutput.getResourceMap(resourceName);
- Iterator<Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
+ Map<String, Map<String, String>> bpStateMap =
+ ResourceAssignment.stringMapsFromReplicaMaps(bestPossOutput.getResourceAssignment(
+ Id.resource(resourceName)).getResourceMap());
+ Iterator<Entry<String, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry<Partition, Map<String, String>> entry = iter.next();
+ Map.Entry<String, Map<String, String>> entry = iter.next();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.isEmpty()) {
iter.remove();
@@ -310,7 +344,9 @@ public class ClusterStateVerifier {
// step 1: externalView and bestPossibleState has equal size
int extViewSize = extView.getRecord().getMapFields().size();
- int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+ int bestPossStateSize =
+ bestPossOutput.getResourceAssignment(Id.resource(resourceName)).getMappedPartitions()
+ .size();
if (extViewSize != bestPossStateSize) {
LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
+ bestPossStateSize + ") for resource: " + resourceName);
@@ -328,7 +364,8 @@ public class ClusterStateVerifier {
for (String partition : extView.getRecord().getMapFields().keySet()) {
Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
Map<String, String> bpInstanceStateMap =
- bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+ ResourceAssignment.stringMapFromReplicaMap(bestPossOutput.getResourceAssignment(
+ Id.resource(resourceName)).getReplicaMap(Id.partition(partition)));
boolean result =
ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
@@ -404,24 +441,27 @@ public class ClusterStateVerifier {
/**
* calculate the best possible state note that DROPPED states are not checked since when
* kick off the BestPossibleStateCalcStage we are providing an empty current state map
+ * @param convertedDefs
* @param cache
* @return
* @throws Exception
*/
- static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception {
+ static NewBestPossibleStateOutput calcBestPossState(Cluster cluster,
+ Map<StateModelDefId, StateModelDefinition> convertedDefs) throws Exception {
ClusterEvent event = new ClusterEvent("sampleEvent");
- event.addAttribute("ClusterDataCache", cache);
+ event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), convertedDefs);
- ResourceComputationStage rcState = new ResourceComputationStage();
- CurrentStateComputationStage csStage = new CurrentStateComputationStage();
- BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+ NewResourceComputationStage rcState = new NewResourceComputationStage();
+ NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
+ NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
runStage(event, rcState);
runStage(event, csStage);
runStage(event, bpStage);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
// System.out.println("output:" + output);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index cc26596..ce2781f 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -52,6 +52,9 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
public class TestNewStages extends ZkUnitTestBase {
final int n = 2;
final int p = 8;
@@ -115,7 +118,14 @@ public class TestNewStages extends ZkUnitTestBase {
Cluster cluster = clusterAccessor.readCluster();
ClusterEvent event = new ClusterEvent(testName);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
- event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+ Map<ResourceId, ResourceConfig> resourceConfigMap =
+ Maps.transformValues(cluster.getResourceMap(), new Function<Resource, ResourceConfig>() {
+ @Override
+ public ResourceConfig apply(Resource resource) {
+ return resource.getConfig();
+ }
+ });
+ event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
event.addAttribute("ClusterDataCache", cluster);
Map<StateModelDefId, StateModelDefinition> stateModelMap =
new HashMap<StateModelDefId, StateModelDefinition>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 6dcf725..382f036 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -29,15 +29,19 @@ import java.util.UUID;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.Mocks;
-import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.annotations.AfterClass;
@@ -107,11 +111,16 @@ public class BaseStageTest {
protected void setupLiveInstances(int numLiveInstances) {
// setup liveInstances
for (int i = 0; i < numLiveInstances; i++) {
- LiveInstance liveInstance = new LiveInstance("localhost_" + i);
+ String instanceName = "localhost_" + i;
+ InstanceConfig instanceConfig = new InstanceConfig(Id.participant(instanceName));
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setPort(Integer.toString(i));
+ LiveInstance liveInstance = new LiveInstance(instanceName);
liveInstance.setSessionId("session_" + i);
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.liveInstance("localhost_" + i), liveInstance);
+ accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+ accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance);
}
}
@@ -128,32 +137,38 @@ public class BaseStageTest {
stage.postProcess();
}
- protected void setupStateModel() {
- ZNRecord masterSlave = new StateModelConfigGenerator().generateConfigForMasterSlave();
-
+ protected Map<StateModelDefId, StateModelDefinition> setupStateModel() {
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), new StateModelDefinition(
- masterSlave));
+ Map<StateModelDefId, StateModelDefinition> defs =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+
+ ZNRecord masterSlave = StateModelConfigGenerator.generateConfigForMasterSlave();
+ StateModelDefinition masterSlaveDef = new StateModelDefinition(masterSlave);
+ defs.put(Id.stateModelDef(masterSlaveDef.getId()), masterSlaveDef);
+ accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlaveDef);
+
+ ZNRecord leaderStandby = StateModelConfigGenerator.generateConfigForLeaderStandby();
+ StateModelDefinition leaderStandbyDef = new StateModelDefinition(leaderStandby);
+ defs.put(Id.stateModelDef(leaderStandbyDef.getId()), leaderStandbyDef);
+ accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandbyDef);
- ZNRecord leaderStandby = new StateModelConfigGenerator().generateConfigForLeaderStandby();
- accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), new StateModelDefinition(
- leaderStandby));
+ ZNRecord onlineOffline = StateModelConfigGenerator.generateConfigForOnlineOffline();
+ StateModelDefinition onlineOfflineDef = new StateModelDefinition(onlineOffline);
+ defs.put(Id.stateModelDef(onlineOfflineDef.getId()), onlineOfflineDef);
+ accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOfflineDef);
- ZNRecord onlineOffline = new StateModelConfigGenerator().generateConfigForOnlineOffline();
- accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), new StateModelDefinition(
- onlineOffline));
+ return defs;
}
- protected Map<String, Resource> getResourceMap() {
- Map<String, Resource> resourceMap = new HashMap<String, Resource>();
- Resource testResource = new Resource("testResourceName");
- testResource.setStateModelDefRef("MasterSlave");
- testResource.addPartition("testResourceName_0");
- testResource.addPartition("testResourceName_1");
- testResource.addPartition("testResourceName_2");
- testResource.addPartition("testResourceName_3");
- testResource.addPartition("testResourceName_4");
- resourceMap.put("testResourceName", testResource);
+ protected Map<ResourceId, ResourceConfig> getResourceMap() {
+ Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
+ ResourceConfig.Builder builder = new ResourceConfig.Builder(Id.resource("testResourceName"));
+ builder.addPartition(new Partition(Id.partition("testResourceName_0")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_1")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_2")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_3")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_4")));
+ resourceMap.put(Id.resource("testResourceName"), builder.build());
return resourceMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 2453bd8..82b70b1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -24,12 +24,17 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
-import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -41,68 +46,78 @@ import org.testng.annotations.Test;
public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
@Test
public void testSemiAutoModeCompatibility() {
- System.out.println("START TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("START TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
String[] resources = new String[] {
"testResourceName"
};
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
setupLiveInstances(5);
- setupStateModel();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
+ NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+ NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
runStage(event, stage2);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
- .get("localhost_" + (p + 1) % 5));
+ Map<ParticipantId, State> replicaMap =
+ output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+ Id.partition("testResourceName_" + p));
+ AssertJUnit.assertEquals(State.from("MASTER"),
+ replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
}
- System.out.println("END TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("END TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
}
@Test
public void testCustomModeCompatibility() {
- System.out.println("START TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("START TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
String[] resources = new String[] {
"testResourceName"
};
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
setupLiveInstances(5);
- setupStateModel();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
+ NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+ NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
runStage(event, stage2);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get(
- "localhost_" + (p + 1) % 5));
+ Map<ParticipantId, State> replicaMap =
+ output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+ Id.partition("testResourceName_" + p));
+ AssertJUnit.assertEquals(State.from("MASTER"),
+ replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
}
- System.out.println("END TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("END TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
}
protected List<IdealState> setupIdealStateDeprecated(int nodes, String[] resources,
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 82c7b37..1a76615 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -22,14 +22,14 @@ package org.apache.helix.controller.stages;
import java.util.Date;
import java.util.Map;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -45,24 +45,27 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
};
setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
setupLiveInstances(5);
- setupStateModel();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
+ NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+ NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
runStage(event, stage2);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
- .get("localhost_" + (p + 1) % 5));
+ Map<ParticipantId, State> replicaMap =
+ output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+ Id.partition("testResourceName_" + p));
+ AssertJUnit.assertEquals(State.from("MASTER"),
+ replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
}
System.out.println("END TestBestPossibleStateCalcStage at "
+ new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index bce7c2d..47875fc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
import org.testng.Assert;
@@ -64,6 +65,8 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
LiveInstance liveInstance = new LiveInstance(record);
liveInstance.setSessionId("session_0");
accessor.setProperty(keyBuilder.liveInstance("localhost_0"), liveInstance);
+ InstanceConfig config = new InstanceConfig(liveInstance.getInstanceName());
+ accessor.setProperty(keyBuilder.instanceConfig(config.getInstanceName()), config);
if (controllerVersion != null) {
((Mocks.MockManager) manager).setVersion(controllerVersion);
@@ -74,13 +77,13 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
.put("minimum_supported_version.participant", minSupportedParticipantVersion);
}
event.addAttribute("helixmanager", manager);
- runStage(event, new ReadClusterDataStage());
+ runStage(event, new NewReadClusterDataStage());
}
@Test
public void testCompatible() {
prepare("0.4.0", "0.4.0");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -95,7 +98,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testNullParticipantVersion() {
prepare("0.4.0", null);
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -111,7 +114,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testNullControllerVersion() {
prepare(null, "0.4.0");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -127,7 +130,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testIncompatible() {
prepare("0.6.1-incubating-SNAPSHOT", "0.3.4", "0.4");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index ecad444..3f567ae 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -24,11 +24,11 @@ import java.util.Map;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -36,32 +36,32 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
@Test
public void testEmptyCS() {
- Map<String, Resource> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- CurrentStateComputationStage stage = new CurrentStateComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ NewCurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
- output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
- 0);
+ output.getCurrentStateMap(Id.resource("testResourceName"),
+ Id.partition("testResourceName_0")).size(), 0);
}
@Test
public void testSimpleCS() {
// setup resource
- Map<String, Resource> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
setupLiveInstances(5);
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- CurrentStateComputationStage stage = new CurrentStateComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ NewCurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
- output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
- 0);
+ output1.getCurrentStateMap(Id.resource("testResourceName"),
+ Id.partition("testResourceName_0")).size(), 0);
// Add a state transition messages
Message message = new Message(Message.MessageType.STATE_TRANSITION, Id.message("msg1"));
@@ -75,13 +75,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
- runStage(event, new ReadClusterDataStage());
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
- String pendingState =
- output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
- "localhost_3");
- AssertJUnit.assertEquals(pendingState, "SLAVE");
+ NewCurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ State pendingState =
+ output2.getPendingState(Id.resource("testResourceName"),
+ Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+ AssertJUnit.assertEquals(pendingState, State.from("SLAVE"));
ZNRecord record1 = new ZNRecord("testResourceName");
// Add a current state that matches sessionId and one that does not match
@@ -100,13 +100,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
accessor.setProperty(
keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
stateWithDeadSession);
- runStage(event, new ReadClusterDataStage());
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
- String currentState =
- output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
- "localhost_3");
- AssertJUnit.assertEquals(currentState, "OFFLINE");
+ NewCurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ State currentState =
+ output3.getCurrentState(Id.resource("testResourceName"),
+ Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+ AssertJUnit.assertEquals(currentState, State.from("OFFLINE"));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 26bbc20..bcd8f4a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -41,7 +41,6 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -56,7 +55,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
HelixManager manager = new DummyClusterManager(clusterName, accessor);
// ideal state: node0 is MASTER, node1 is SLAVE
@@ -74,7 +73,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
- MessageThrottleStage throttleStage = new MessageThrottleStage();
+ NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
try {
runStage(event, throttleStage);
Assert.fail("Should throw exception since DATA_CACHE is null");
@@ -83,7 +82,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
}
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
runPipeline(event, dataRefresh);
try {
@@ -92,7 +91,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
} catch (Exception e) {
// OK
}
- runStage(event, new ResourceComputationStage());
+ runStage(event, new NewResourceComputationStage());
try {
runStage(event, throttleStage);
@@ -100,22 +99,22 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
} catch (Exception e) {
// OK
}
- MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+ NewMessageOutput msgSelectOutput = new NewMessageOutput();
List<Message> selectMessages = new ArrayList<Message>();
Message msg =
createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-001"), "OFFLINE", "SLAVE",
"TestDB", "localhost_0");
selectMessages.add(msg);
- msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+ msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
runStage(event, throttleStage);
- MessageThrottleStageOutput msgThrottleOutput =
+ NewMessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
- 1);
+ Assert.assertEquals(
+ msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0")).size(), 1);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -127,7 +126,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
HelixManager manager = new DummyClusterManager(clusterName, accessor);
// ideal state: node0 is MASTER, node1 is SLAVE
@@ -212,7 +211,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterConstraints constraint =
accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
- MessageThrottleStage throttleStage = new MessageThrottleStage();
+ NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
// test constraintSelection
// message1: hit contraintSelection rule1 and rule2
@@ -262,10 +261,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
event.addAttribute("helixmanager", manager);
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
runPipeline(event, dataRefresh);
- runStage(event, new ResourceComputationStage());
- MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+ runStage(event, new NewResourceComputationStage());
+ NewMessageOutput msgSelectOutput = new NewMessageOutput();
Message msg3 =
createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-003"), "OFFLINE", "SLAVE",
@@ -291,15 +290,15 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
selectMessages.add(msg5); // should be throttled
selectMessages.add(msg6); // should be throttled
- msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+ msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
runStage(event, throttleStage);
- MessageThrottleStageOutput msgThrottleOutput =
+ NewMessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
List<Message> throttleMessages =
- msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
+ msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0"));
Assert.assertEquals(throttleMessages.size(), 4);
Assert.assertTrue(throttleMessages.contains(msg1));
Assert.assertTrue(throttleMessages.contains(msg2));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 97d5ec1..825aa05 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,15 +20,25 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.HelixVersion;
import org.apache.helix.api.Id;
-import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.stages.NewMessageSelectionStage.Bounds;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -38,15 +48,27 @@ public class TestMsgSelectionStage {
public void testMasterXfer() {
System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
- Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
- liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
- liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
- Map<String, String> currentStates = new HashMap<String, String>();
- currentStates.put("localhost_0", "SLAVE");
- currentStates.put("localhost_1", "MASTER");
-
- Map<String, String> pendingStates = new HashMap<String, String>();
+ Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+ Set<PartitionId> disabledPartitions = Collections.emptySet();
+ Set<String> tags = Collections.emptySet();
+ Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+ Map<MessageId, Message> messageMap = Collections.emptyMap();
+ RunningInstance runningInstance0 =
+ new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+ RunningInstance runningInstance1 =
+ new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+ "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+ messageMap));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+ "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+ messageMap));
+
+ Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+ currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+ currentStates.put(Id.participant("localhost_1"), State.from("MASTER"));
+
+ Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
List<Message> messages = new ArrayList<Message>();
messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
@@ -54,17 +76,17 @@ public class TestMsgSelectionStage {
messages.add(TestHelper.createMessage(Id.message("msgId_1"), "MASTER", "SLAVE", "localhost_1",
"TestDB", "TestDB_0"));
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- stateConstraints.put("MASTER", new Bounds(0, 1));
- stateConstraints.put("SLAVE", new Bounds(0, 2));
+ Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+ stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+ stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
stateTransitionPriorities.put("MASTER-SLAVE", 0);
stateTransitionPriorities.put("SLAVE-MASTER", 1);
List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
- messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+ new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+ messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
Assert.assertEquals(selectedMsg.size(), 1);
Assert.assertEquals(selectedMsg.get(0).getMsgId(), Id.message("msgId_1"));
@@ -76,32 +98,44 @@ public class TestMsgSelectionStage {
System.out.println("START testMasterXferAfterMasterResume at "
+ new Date(System.currentTimeMillis()));
- Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
- liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
- liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
- Map<String, String> currentStates = new HashMap<String, String>();
- currentStates.put("localhost_0", "SLAVE");
- currentStates.put("localhost_1", "SLAVE");
-
- Map<String, String> pendingStates = new HashMap<String, String>();
- pendingStates.put("localhost_1", "MASTER");
+ Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+ Set<PartitionId> disabledPartitions = Collections.emptySet();
+ Set<String> tags = Collections.emptySet();
+ Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+ Map<MessageId, Message> messageMap = Collections.emptyMap();
+ RunningInstance runningInstance0 =
+ new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+ RunningInstance runningInstance1 =
+ new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+ "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+ messageMap));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+ "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+ messageMap));
+
+ Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+ currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+ currentStates.put(Id.participant("localhost_1"), State.from("SLAVE"));
+
+ Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
+ pendingStates.put(Id.participant("localhost_1"), State.from("MASTER"));
List<Message> messages = new ArrayList<Message>();
messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
"TestDB", "TestDB_0"));
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- stateConstraints.put("MASTER", new Bounds(0, 1));
- stateConstraints.put("SLAVE", new Bounds(0, 2));
+ Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+ stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+ stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
stateTransitionPriorities.put("MASTER-SLAVE", 0);
stateTransitionPriorities.put("SLAVE-MASTER", 1);
List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
- messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+ new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+ messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
Assert.assertEquals(selectedMsg.size(), 0);
System.out.println("END testMasterXferAfterMasterResume at "
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 7cd942e..a3f38ea 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -39,7 +39,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -76,17 +75,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -96,10 +95,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
Message message = messages.get(0);
Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -114,7 +112,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -220,17 +219,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -240,10 +239,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
Message message = messages.get(0);
Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -258,7 +256,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1,
"Should output only 1 message: OFFLINE->DROPPED for localhost_1");
@@ -275,7 +274,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1,
"Should output 1 message: OFFLINE->DROPPED for localhost_0");
message = messages.get(0);
@@ -315,17 +315,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// round1: set node1 currentState to SLAVE
setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
@@ -333,10 +333,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
Message message = messages.get(0);
Assert.assertEquals(message.getFromState().toString(), "SLAVE");
@@ -354,7 +353,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 86bd060..d4f3de6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
@@ -35,7 +37,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -63,19 +64,21 @@ public class TestResourceComputationStage extends BaseStageTest {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealState(resourceName), idealState);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewResourceComputationStage stage = new NewResourceComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resource =
+ event.getAttribute(AttributeName.RESOURCES.toString());
AssertJUnit.assertEquals(1, resource.size());
- AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
- AssertJUnit.assertEquals(resource.values().iterator().next().getResourceName(), resourceName);
- AssertJUnit.assertEquals(resource.values().iterator().next().getStateModelDefRef(),
- idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resource.keySet().iterator().next(), Id.resource(resourceName));
AssertJUnit
- .assertEquals(resource.values().iterator().next().getPartitions().size(), partitions);
+ .assertEquals(resource.values().iterator().next().getId(), Id.resource(resourceName));
+ AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
+ .getStateModelDefId(), idealState.getStateModelDefId());
+ AssertJUnit.assertEquals(resource.values().iterator().next().getPartitionSet().size(),
+ partitions);
}
@Test
@@ -85,21 +88,23 @@ public class TestResourceComputationStage extends BaseStageTest {
"testResource1", "testResource2"
};
List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewResourceComputationStage stage = new NewResourceComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
AssertJUnit.assertEquals(resources.length, resourceMap.size());
for (int i = 0; i < resources.length; i++) {
String resourceName = resources[i];
+ ResourceId resourceId = Id.resource(resourceName);
IdealState idealState = idealStates.get(i);
AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
- idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+ .getStateModelDefId(), idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
idealState.getNumPartitions());
}
}
@@ -151,41 +156,47 @@ public class TestResourceComputationStage extends BaseStageTest {
accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
currentState);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewResourceComputationStage stage = new NewResourceComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
// +1 because it will have one for current state
AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
for (int i = 0; i < resources.length; i++) {
String resourceName = resources[i];
+ ResourceId resourceId = Id.resource(resourceName);
IdealState idealState = idealStates.get(i);
- AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
- idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+ AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+ .getStateModelDefId(), idealState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
idealState.getNumPartitions());
}
// Test the data derived from CurrentState
- AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
- AssertJUnit.assertEquals(resourceMap.get(oldResource).getResourceName(), oldResource);
- AssertJUnit.assertEquals(resourceMap.get(oldResource).getStateModelDefRef(),
- currentState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(oldResource).getPartitions().size(), currentState
- .getPartitionStateStringMap().size());
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+ ResourceId oldResourceId = Id.resource(oldResource);
+ AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
+ .getStateModelDefId(), currentState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getPartitionSet().size(), currentState
+ .getPartitionStateMap().size());
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ Id.partition("testResourceOld_0")));
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ Id.partition("testResourceOld_1")));
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ Id.partition("testResourceOld_2")));
}
@Test
public void testNull() {
ClusterEvent event = new ClusterEvent("sampleEvent");
- ResourceComputationStage stage = new ResourceComputationStage();
+ NewResourceComputationStage stage = new NewResourceComputationStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 747a185..d8afec5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -26,10 +26,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.ZNRecord;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -176,7 +176,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
_startCMResultMap.put(storageNodeName, resultx);
}
- Thread.sleep(1000);
+ Thread.sleep(5000);
result =
ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
CLUSTER_NAME, TEST_DB));