You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/12/13 02:32:05 UTC
git commit: [HELIX-344] Add app-specific ideal state validation,
rb=16199
Updated Branches:
refs/heads/master 39c8b3d80 -> ba163ed65
[HELIX-344] Add app-specific ideal state validation, rb=16199
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/ba163ed6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/ba163ed6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/ba163ed6
Branch: refs/heads/master
Commit: ba163ed650139c678d978a2e2e5a1169ae18473f
Parents: 39c8b3d
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Dec 12 11:25:45 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Dec 12 17:31:45 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/api/Resource.java | 10 +
.../controller/GenericHelixController.java | 2 +
.../helix/controller/stages/AttributeName.java | 1 +
.../controller/stages/ClusterDataCache.java | 4 +-
.../controller/stages/ReadClusterDataStage.java | 10 +
.../stages/ResourceValidationStage.java | 103 +++++++++
.../helix/model/ClusterConfiguration.java | 28 +++
.../java/org/apache/helix/ZkUnitTestBase.java | 68 ++++++
.../stages/TestResourceValidationStage.java | 216 +++++++++++++++++++
.../integration/TestFullAutoNodeTagging.java | 65 ------
.../TestInvalidResourceRebalance.java | 102 +++++++++
11 files changed, 542 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 0726510..8c608ac 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -43,6 +43,7 @@ public class Resource {
private final ResourceConfig _config;
private final ExternalView _externalView;
private final ResourceAssignment _resourceAssignment;
+ private final IdealState _idealState;
/**
* Construct a resource
@@ -66,6 +67,7 @@ public class Resource {
batchMessageMode);
_externalView = externalView;
_resourceAssignment = resourceAssignment;
+ _idealState = idealState;
}
/**
@@ -206,4 +208,12 @@ public class Resource {
public ResourceConfig getConfig() {
return _config;
}
+
+ /**
+ * Get the ideal state of the resource
+ * @return IdealState for this resource, if it exists
+ */
+ public IdealState getIdealState() {
+ return _idealState;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 96be0fa..9fef2da 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -57,6 +57,7 @@ import org.apache.helix.controller.stages.PersistContextStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.controller.stages.ResourceValidationStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -182,6 +183,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
Pipeline rebalancePipeline = new Pipeline();
rebalancePipeline.addStage(new CompatibilityCheckStage());
rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new ResourceValidationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
rebalancePipeline.addStage(new MessageGenerationStage());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 5cedd7c..90e475a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -28,4 +28,5 @@ public enum AttributeName {
MESSAGES_THROTTLE,
LOCAL_STATE,
CONTEXT_PROVIDER,
+ IDEAL_STATE_RULES
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index ac1cef4..6f09d26 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -44,7 +44,6 @@ import org.apache.log4j.Logger;
*/
@Deprecated
public class ClusterDataCache {
-
Map<String, LiveInstance> _liveInstanceMap;
Map<String, IdealState> _idealStateMap;
Map<String, StateModelDefinition> _stateModelDefMap;
@@ -73,7 +72,8 @@ public class ClusterDataCache {
_liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
for (LiveInstance instance : _liveInstanceMap.values()) {
- LOG.trace("live instance: " + instance.getParticipantId() + " " + instance.getTypedSessionId());
+ LOG.trace("live instance: " + instance.getParticipantId() + " "
+ + instance.getTypedSessionId());
}
_stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index fb016f1..2279d76 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -31,6 +31,7 @@ import org.apache.helix.controller.context.ControllerContext;
import org.apache.helix.controller.context.ControllerContextProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
@@ -84,6 +85,15 @@ public class ReadClusterDataStage extends AbstractBaseStage {
ControllerContextProvider contextProvider = new ControllerContextProvider(persistedContexts);
event.addAttribute(AttributeName.CONTEXT_PROVIDER.toString(), contextProvider);
+ // read ideal state rules (if any)
+ ClusterConfiguration clusterConfiguration =
+ accessor.getProperty(accessor.keyBuilder().clusterConfig());
+ if (clusterConfiguration == null) {
+ clusterConfiguration = new ClusterConfiguration(cluster.getId());
+ }
+ event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(),
+ clusterConfiguration.getIdealStateRules());
+
long endTime = System.currentTimeMillis();
LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
new file mode 100644
index 0000000..470de2c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -0,0 +1,103 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class ResourceValidationStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(ResourceValidationStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ if (cluster == null) {
+ throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+ }
+ Map<ResourceId, ResourceConfig> resourceConfigMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ if (resourceConfigMap == null) {
+ throw new StageException("Resources must be computed prior to validation!");
+ }
+ Map<ResourceId, Resource> resourceMap = cluster.getResourceMap();
+ Map<String, Map<String, String>> idealStateRuleMap =
+ event.getAttribute(AttributeName.IDEAL_STATE_RULES.toString());
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ // check every ideal state against the ideal state rules
+ // the pipeline should not process any resources that have an unsupported ideal state
+ IdealState idealState = resourceMap.get(resourceId).getIdealState();
+ if (idealState == null) {
+ continue;
+ }
+ if (idealStateRuleMap != null && !idealStateRuleMap.isEmpty()) {
+ boolean hasMatchingRule = false;
+ for (String ruleName : idealStateRuleMap.keySet()) {
+ Map<String, String> rule = idealStateRuleMap.get(ruleName);
+ boolean matches = idealStateMatchesRule(idealState, rule);
+ hasMatchingRule = hasMatchingRule || matches;
+ if (matches) {
+ break;
+ }
+ }
+ if (!hasMatchingRule) {
+ LOG.warn("Resource " + resourceId + " does not have a valid ideal state!");
+ resourceConfigMap.remove(resourceId);
+ }
+ }
+
+ // check that every resource to process has a live state model definition
+ StateModelDefId stateModelDefId = idealState.getStateModelDefId();
+ StateModelDefinition stateModelDef = cluster.getStateModelMap().get(stateModelDefId);
+ if (stateModelDef == null) {
+ LOG.warn("Resource " + resourceId + " uses state model " + stateModelDefId
+ + ", but it is not on the cluster!");
+ resourceConfigMap.remove(resourceId);
+ }
+ }
+ }
+
+ /**
+ * Check if the ideal state adheres to a rule
+ * @param idealState the ideal state to check
+ * @param rule the rules of a valid ideal state
+ * @return true if the ideal state is a superset of the entries of the rule, false otherwise
+ */
+ private boolean idealStateMatchesRule(IdealState idealState, Map<String, String> rule) {
+ Map<String, String> simpleFields = idealState.getRecord().getSimpleFields();
+ for (String key : rule.keySet()) {
+ String value = rule.get(key);
+ if (!simpleFields.containsKey(key) || !value.equals(simpleFields.get(key))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 8386d6c..5e2daa6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -1,5 +1,7 @@
package org.apache.helix.model;
+import java.util.Map;
+
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.NamespacedConfig;
@@ -7,6 +9,8 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.manager.zk.ZKHelixManager;
+import com.google.common.collect.Maps;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -30,6 +34,8 @@ import org.apache.helix.manager.zk.ZKHelixManager;
* Persisted configuration properties for a cluster
*/
public class ClusterConfiguration extends HelixProperty {
+ private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule";
+
/**
* Instantiate for an id
* @param id cluster id
@@ -96,6 +102,28 @@ public class ClusterConfiguration extends HelixProperty {
}
/**
+ * Get the rules, if any, that all ideal states in this cluster must follow
+ * @return map of rule name to key-value requirements in the ideal state
+ */
+ public Map<String, Map<String, String>> getIdealStateRules() {
+ NamespacedConfig rules = new NamespacedConfig(this, IDEAL_STATE_RULE_PREFIX);
+ Map<String, Map<String, String>> idealStateRuleMap = Maps.newHashMap();
+ for (String simpleKey : rules.getSimpleFields().keySet()) {
+ String simpleValue = rules.getSimpleField(simpleKey);
+ String[] splitRules = simpleValue.split("(?<!\\\\),");
+ Map<String, String> singleRule = Maps.newHashMap();
+ for (String rule : splitRules) {
+ String[] keyValue = rule.split("(?<!\\\\)=");
+ if (keyValue.length >= 2) {
+ singleRule.put(keyValue[0], keyValue[1]);
+ }
+ }
+ idealStateRuleMap.put(simpleKey, singleRule);
+ }
+ return idealStateRuleMap;
+ }
+
+ /**
* Create a new ClusterConfiguration from a UserConfig
* @param userConfig user-defined configuration properties
* @return ClusterConfiguration
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 33968f7..75f23e3 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -44,6 +44,8 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -51,7 +53,9 @@ import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.ZKClientPool;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -357,4 +361,68 @@ public class ZkUnitTestBase {
return msg;
}
+ /**
+ * Ensures that external view and current state are empty
+ */
+ protected static class EmptyZkVerifier implements ZkVerifier {
+ private final String _clusterName;
+ private final String _resourceName;
+ private final ZkClient _zkClient;
+
+ /**
+ * Instantiate the verifier
+ * @param clusterName the cluster to verify
+ * @param resourceName the resource to verify
+ */
+ public EmptyZkVerifier(String clusterName, String resourceName) {
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+ }
+
+ @Override
+ public boolean verify() {
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+ // verify external view empty
+ if (externalView != null) {
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partition);
+ if (stateMap != null && !stateMap.isEmpty()) {
+ LOG.error("External view not empty for " + partition);
+ return false;
+ }
+ }
+ }
+
+ // verify current state empty
+ List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
+ for (String participant : liveParticipants) {
+ List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
+ for (String sessionId : sessionIds) {
+ CurrentState currentState =
+ accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
+ Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
+ if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+ LOG.error("Current state not empty for " + participant);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return _zkClient;
+ }
+
+ @Override
+ public String getClusterName() {
+ return _clusterName;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
new file mode 100644
index 0000000..ece46ff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -0,0 +1,216 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.Mocks;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.ClusterConfiguration;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+public class TestResourceValidationStage {
+ private static final String PARTICIPANT = "localhost_1234";
+ private static final String STATE = "OFFLINE";
+
+ @Test
+ public void testIdealStateValidity() throws Exception {
+ Mocks.MockAccessor accessor = new Mocks.MockAccessor();
+
+ // create some ideal states
+ ResourceId masterSlaveCustomResource = ResourceId.from("masterSlaveCustomResource");
+ ResourceId onlineOfflineFullAutoResource = ResourceId.from("onlineOfflineFullAutoResource");
+ ResourceId masterSlaveSemiAutoInvalidResource =
+ ResourceId.from("masterSlaveSemiAutoInvalidResource");
+ createIS(accessor, masterSlaveCustomResource, "MasterSlave", RebalanceMode.CUSTOMIZED);
+ createIS(accessor, onlineOfflineFullAutoResource, "OnlineOffline", RebalanceMode.FULL_AUTO);
+ createIS(accessor, masterSlaveSemiAutoInvalidResource, "MasterSlave", RebalanceMode.SEMI_AUTO);
+
+ // create some ideal state specs
+ createISSpec(accessor, masterSlaveCustomResource + "_spec", "MasterSlave",
+ RebalanceMode.CUSTOMIZED);
+ createISSpec(accessor, onlineOfflineFullAutoResource + "_spec", "OnlineOffline",
+ RebalanceMode.FULL_AUTO);
+ ClusterConfiguration clusterConfiguration =
+ accessor.getProperty(accessor.keyBuilder().clusterConfig());
+
+ // add some state models
+ addStateModels(accessor);
+
+ // refresh the cache
+ ClusterEvent event = new ClusterEvent("testEvent");
+ ClusterId clusterId = new ClusterId("sampleClusterId");
+ ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
+ Cluster cluster = clusterAccessor.readCluster();
+ event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(),
+ clusterConfiguration.getIdealStateRules());
+
+ // run resource computation
+ new ResourceComputationStage().process(event);
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
+ Assert.assertTrue(resourceMap.containsKey(onlineOfflineFullAutoResource));
+ Assert.assertTrue(resourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
+
+ // run resource validation
+ new ResourceValidationStage().process(event);
+ Map<ResourceId, ResourceConfig> finalResourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
+ Assert.assertTrue(finalResourceMap.containsKey(onlineOfflineFullAutoResource));
+ Assert.assertFalse(finalResourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
+ }
+
+ @Test
+ public void testNoSpec() throws Exception {
+ Mocks.MockAccessor accessor = new Mocks.MockAccessor();
+
+ // create an ideal state and no spec
+ ResourceId masterSlaveCustomResource = ResourceId.from("masterSlaveCustomResource");
+ createIS(accessor, masterSlaveCustomResource, "MasterSlave", RebalanceMode.CUSTOMIZED);
+
+ // add some state models
+ addStateModels(accessor);
+
+ // refresh the cache
+ ClusterEvent event = new ClusterEvent("testEvent");
+ ClusterId clusterId = new ClusterId("sampleClusterId");
+ ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
+ Cluster cluster = clusterAccessor.readCluster();
+ event.addAttribute("ClusterDataCache", cluster);
+ Map<String, Map<String, String>> emptyMap = Maps.newHashMap();
+ event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(), emptyMap);
+
+ // run resource computation
+ new ResourceComputationStage().process(event);
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
+
+ // run resource validation
+ new ResourceValidationStage().process(event);
+ Map<ResourceId, ResourceConfig> finalResourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
+ }
+
+ @Test
+ public void testMissingStateModel() throws Exception {
+ Mocks.MockAccessor accessor = new Mocks.MockAccessor();
+
+ // create an ideal state and no spec
+ ResourceId masterSlaveCustomResource = ResourceId.from("masterSlaveCustomResource");
+ ResourceId leaderStandbyCustomResource = ResourceId.from("leaderStandbyCustomResource");
+ createIS(accessor, masterSlaveCustomResource, "MasterSlave", RebalanceMode.CUSTOMIZED);
+ createIS(accessor, leaderStandbyCustomResource, "LeaderStandby", RebalanceMode.CUSTOMIZED);
+
+ // add some state models (but not leader standby)
+ addStateModels(accessor);
+
+ // refresh the cache
+ ClusterEvent event = new ClusterEvent("testEvent");
+ ClusterId clusterId = new ClusterId("sampleClusterId");
+ ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
+ Cluster cluster = clusterAccessor.readCluster();
+ event.addAttribute("ClusterDataCache", cluster);
+ Map<String, Map<String, String>> emptyMap = Maps.newHashMap();
+ event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(), emptyMap);
+
+ // run resource computation
+ new ResourceComputationStage().process(event);
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
+ Assert.assertTrue(resourceMap.containsKey(leaderStandbyCustomResource));
+
+ // run resource validation
+ new ResourceValidationStage().process(event);
+ Map<ResourceId, ResourceConfig> finalResourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
+ Assert.assertFalse(finalResourceMap.containsKey(leaderStandbyCustomResource));
+ }
+
+ private void createIS(HelixDataAccessor accessor, ResourceId resourceId, String stateModelDefRef,
+ RebalanceMode rebalanceMode) {
+ IdealState idealState = new IdealState(resourceId);
+ idealState.setRebalanceMode(rebalanceMode);
+ idealState.setStateModelDefRef(stateModelDefRef);
+ idealState.setNumPartitions(1);
+ idealState.setReplicas("1");
+ idealState.getRecord().setListField(resourceId + "_0", ImmutableList.of(PARTICIPANT));
+ idealState.getRecord().setMapField(resourceId + "_0", ImmutableMap.of(PARTICIPANT, STATE));
+ accessor.setProperty(accessor.keyBuilder().idealStates(resourceId.toString()), idealState);
+ }
+
+ private void createISSpec(HelixDataAccessor accessor, String specId, String stateModelDefRef,
+ RebalanceMode rebalanceMode) {
+ PropertyKey propertyKey = accessor.keyBuilder().clusterConfig();
+ HelixProperty property = accessor.getProperty(propertyKey);
+ if (property == null) {
+ property = new HelixProperty("sampleClusterConfig");
+ }
+ String key = "IdealStateRule!" + specId;
+ String value =
+ IdealStateProperty.REBALANCE_MODE.toString() + "=" + rebalanceMode.toString() + ","
+ + IdealStateProperty.STATE_MODEL_DEF_REF.toString() + "=" + stateModelDefRef;
+ property.getRecord().setSimpleField(key, value);
+ accessor.setProperty(propertyKey, property);
+ }
+
+ private void addStateModels(HelixDataAccessor accessor) {
+ StateModelDefinition masterSlave =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ accessor.setProperty(accessor.keyBuilder().stateModelDef(masterSlave.getId()), masterSlave);
+ StateModelDefinition onlineOffline =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+ accessor.setProperty(accessor.keyBuilder().stateModelDef(onlineOffline.getId()), onlineOffline);
+ }
+
+ private static class MockClusterAccessor extends ClusterAccessor {
+ public MockClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ super(clusterId, accessor);
+ }
+
+ @Override
+ public boolean isClusterStructureValid() {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
index e0c8b6f..0acd775 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
@@ -20,7 +20,6 @@ package org.apache.helix.integration;
*/
import java.util.Date;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,7 +36,6 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -372,67 +370,4 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
return _clusterName;
}
}
-
- /**
- * Ensures that external view and current state are empty
- */
- private static class EmptyZkVerifier implements ZkVerifier {
- private final String _clusterName;
- private final String _resourceName;
- private final ZkClient _zkClient;
-
- /**
- * Instantiate the verifier
- * @param clusterName the cluster to verify
- * @param resourceName the resource to verify
- */
- public EmptyZkVerifier(String clusterName, String resourceName) {
- _clusterName = clusterName;
- _resourceName = resourceName;
- _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
- }
-
- @Override
- public boolean verify() {
- BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
-
- // verify external view empty
- for (String partition : externalView.getPartitionSet()) {
- Map<String, String> stateMap = externalView.getStateMap(partition);
- if (stateMap != null && !stateMap.isEmpty()) {
- LOG.error("External view not empty for " + partition);
- return false;
- }
- }
-
- // verify current state empty
- List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
- for (String participant : liveParticipants) {
- List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
- for (String sessionId : sessionIds) {
- CurrentState currentState =
- accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
- Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
- if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
- LOG.error("Current state not empty for " + participant);
- return false;
- }
- }
- }
- return true;
- }
-
- @Override
- public ZkClient getZkClient() {
- return _zkClient;
- }
-
- @Override
- public String getClusterName() {
- return _clusterName;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ba163ed6/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java
new file mode 100644
index 0000000..7b56f5d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestInvalidResourceRebalance extends ZkUnitTestBase {
+ /**
+ * Ensure that the Helix controller doesn't attempt to rebalance resources with invalid ideal
+ * states
+ */
+ @Test
+ public void testResourceRebalanceSkipped() throws Exception {
+ final int NUM_PARTICIPANTS = 2;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final String RESOURCE_NAME = "TestDB0";
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "MasterSlave", RebalanceMode.SEMI_AUTO, // use SEMI_AUTO mode
+ true); // do rebalance
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // add the ideal state spec (prevents non-CUSTOMIZED MasterSlave ideal states)
+ HelixAdmin helixAdmin = controller.getClusterManagmentTool();
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("IdealStateRule!sampleRuleName",
+ "IDEAL_STATE_MODE=CUSTOMIZED,STATE_MODEL_DEF_REF=MasterSlave");
+ helixAdmin.setConfig(
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build(),
+ properties);
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ final String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ Thread.sleep(1000);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new EmptyZkVerifier(clusterName, RESOURCE_NAME));
+ Assert.assertTrue(result, "External view and current state must be empty");
+
+ // cleanup
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i].syncStop();
+ }
+ controller.syncStop();
+ }
+
+}