You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/12/12 23:04:00 UTC

[1/2] git commit: [HELIX-344] Add app-specific ideal state validation, rb=16199

Updated Branches:
  refs/heads/helix-0.6.2-release d8a618eaf -> af56823c3


[HELIX-344] Add app-specific ideal state validation, rb=16199


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5937f364
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5937f364
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5937f364

Branch: refs/heads/helix-0.6.2-release
Commit: 5937f3640a535193325aff6c3e291c8c0bef6777
Parents: 1689c67
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Dec 12 11:25:45 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Dec 12 14:03:10 2013 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   2 +
 .../controller/stages/ClusterDataCache.java     |  33 +++-
 .../stages/ResourceValidationStage.java         |  94 ++++++++++
 .../java/org/apache/helix/ZkUnitTestBase.java   |  85 ++++++++-
 .../stages/TestResourceValidationStage.java     | 181 +++++++++++++++++++
 .../integration/TestFullAutoNodeTagging.java    |  65 -------
 .../TestInvalidResourceRebalance.java           | 102 +++++++++++
 7 files changed, 486 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8e4e1ea..7a8b609 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -56,6 +56,7 @@ import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.RebalanceIdealStateStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
+import org.apache.helix.controller.stages.ResourceValidationStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -180,6 +181,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline();
       rebalancePipeline.addStage(new ResourceComputationStage());
+      rebalancePipeline.addStage(new ResourceValidationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new RebalanceIdealStateStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index b90880e..5c0a94a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -25,25 +25,30 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+
 /**
  * Reads the data from the cluster using data accessor. This output ClusterData which
  * provides useful methods to search/lookup properties
  */
 public class ClusterDataCache {
 
+  private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
+
   Map<String, LiveInstance> _liveInstanceMap;
   Map<String, IdealState> _idealStateMap;
   Map<String, StateModelDefinition> _stateModelDefMap;
@@ -51,6 +56,7 @@ public class ClusterDataCache {
   Map<String, ClusterConstraints> _constraintMap;
   Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
   Map<String, Map<String, Message>> _messageMap;
+  Map<String, Map<String, String>> _idealStateRuleMap;
 
   // Map<String, Map<String, HealthStat>> _healthStatMap;
   // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
@@ -105,6 +111,25 @@ public class ClusterDataCache {
     }
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
+    _idealStateRuleMap = Maps.newHashMap();
+    HelixProperty clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+    if (clusterConfig != null) {
+      for (String simpleKey : clusterConfig.getRecord().getSimpleFields().keySet()) {
+        if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) {
+          String simpleValue = clusterConfig.getRecord().getSimpleField(simpleKey);
+          String[] rules = simpleValue.split("(?<!\\\\),");
+          Map<String, String> singleRule = Maps.newHashMap();
+          for (String rule : rules) {
+            String[] keyValue = rule.split("(?<!\\\\)=");
+            if (keyValue.length >= 2) {
+              singleRule.put(keyValue[0], keyValue[1]);
+            }
+          }
+          _idealStateRuleMap.put(simpleKey, singleRule);
+        }
+      }
+    }
+
     return true;
   }
 
@@ -116,6 +141,10 @@ public class ClusterDataCache {
     return _idealStateMap;
   }
 
+  public Map<String, Map<String, String>> getIdealStateRules() {
+    return _idealStateRuleMap;
+  }
+
   /**
    * Returns the LiveInstances for each of the instances that are curretnly up and running
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
new file mode 100644
index 0000000..e552797
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class ResourceValidationStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(ResourceValidationStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    if (cache == null) {
+      throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+    }
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    if (resourceMap == null) {
+      throw new StageException("Resources must be computed prior to validation!");
+    }
+    Map<String, IdealState> idealStateMap = cache.getIdealStates();
+    Map<String, Map<String, String>> idealStateRuleMap = cache.getIdealStateRules();
+
+    for (String resourceName : idealStateMap.keySet()) {
+      // check every ideal state against the ideal state rules
+      // the pipeline should not process any resources that have an unsupported ideal state
+      IdealState idealState = idealStateMap.get(resourceName);
+      if (!idealStateRuleMap.isEmpty()) {
+        boolean hasMatchingRule = false;
+        for (String ruleName : idealStateRuleMap.keySet()) {
+          Map<String, String> rule = idealStateRuleMap.get(ruleName);
+          boolean matches = idealStateMatchesRule(idealState, rule);
+          hasMatchingRule = hasMatchingRule || matches;
+          if (matches) {
+            break;
+          }
+        }
+        if (!hasMatchingRule) {
+          LOG.warn("Resource " + resourceName + " does not have a valid ideal state!");
+          resourceMap.remove(resourceName);
+        }
+      }
+
+      // check that every resource to process has a live state model definition
+      String stateModelDefRef = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefRef);
+      if (stateModelDef == null) {
+        LOG.warn("Resource " + resourceName + " uses state model " + stateModelDefRef
+            + ", but it is not on the cluster!");
+        resourceMap.remove(resourceName);
+      }
+    }
+  }
+
+  /**
+   * Check if the ideal state adheres to a rule
+   * @param idealState the ideal state to check
+   * @param rule the rules of a valid ideal state
+   * @return true if the ideal state is a superset of the entries of the rule, false otherwise
+   */
+  private boolean idealStateMatchesRule(IdealState idealState, Map<String, String> rule) {
+    Map<String, String> simpleFields = idealState.getRecord().getSimpleFields();
+    for (String key : rule.keySet()) {
+      String value = rule.get(key);
+      if (!simpleFields.containsKey(key) || !value.equals(simpleFields.get(key))) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index abf75be..0752481 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -40,6 +40,8 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -47,7 +49,9 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.ZKClientPool;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -164,7 +168,7 @@ public class ZkUnitTestBase {
   public void verifyEnabled(ZkClient zkClient, String clusterName, String instance,
       boolean wantEnabled) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
@@ -173,7 +177,7 @@ public class ZkUnitTestBase {
 
   public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
@@ -247,20 +251,19 @@ public class ZkUnitTestBase {
 
   protected void setupStateModel(String clusterName) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    StateModelConfigGenerator generator = new StateModelConfigGenerator();
     StateModelDefinition masterSlave =
-        new StateModelDefinition(generator.generateConfigForMasterSlave());
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
     accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
 
     StateModelDefinition leaderStandby =
-        new StateModelDefinition(generator.generateConfigForLeaderStandby());
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
     accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
 
     StateModelDefinition onlineOffline =
-        new StateModelDefinition(generator.generateConfigForOnlineOffline());
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
     accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
 
   }
@@ -268,7 +271,7 @@ public class ZkUnitTestBase {
   protected List<IdealState> setupIdealState(String clusterName, int[] nodes, String[] resources,
       int partitions, int replicas) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     List<IdealState> idealStates = new ArrayList<IdealState>();
@@ -302,7 +305,7 @@ public class ZkUnitTestBase {
 
   protected void setupLiveInstances(String clusterName, int[] liveInstances) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     for (int i = 0; i < liveInstances.length; i++) {
@@ -354,4 +357,68 @@ public class ZkUnitTestBase {
     return msg;
   }
 
+  /**
+   * Ensures that external view and current state are empty
+   */
+  protected static class EmptyZkVerifier implements ZkVerifier {
+    private final String _clusterName;
+    private final String _resourceName;
+    private final ZkClient _zkClient;
+
+    /**
+     * Instantiate the verifier
+     * @param clusterName the cluster to verify
+     * @param resourceName the resource to verify
+     */
+    public EmptyZkVerifier(String clusterName, String resourceName) {
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+    }
+
+    @Override
+    public boolean verify() {
+      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+      // verify external view empty
+      if (externalView != null) {
+        for (String partition : externalView.getPartitionSet()) {
+          Map<String, String> stateMap = externalView.getStateMap(partition);
+          if (stateMap != null && !stateMap.isEmpty()) {
+            LOG.error("External view not empty for " + partition);
+            return false;
+          }
+        }
+      }
+
+      // verify current state empty
+      List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
+      for (String participant : liveParticipants) {
+        List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
+        for (String sessionId : sessionIds) {
+          CurrentState currentState =
+              accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
+          Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
+          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+            LOG.error("Current state not empty for " + participant);
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return _zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return _clusterName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
new file mode 100644
index 0000000..15d7fd8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -0,0 +1,181 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.Mocks;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestResourceValidationStage {
+  private static final String PARTICIPANT = "localhost_1234";
+  private static final String STATE = "OFFLINE";
+
+  @Test
+  public void testIdealStateValidity() throws Exception {
+    Mocks.MockAccessor accessor = new Mocks.MockAccessor();
+
+    // create some ideal states
+    String masterSlaveCustomResource = "masterSlaveCustomResource";
+    String onlineOfflineFullAutoResource = "onlineOfflineFullAutoResource";
+    String masterSlaveSemiAutoInvalidResource = "masterSlaveSemiAutoInvalidResource";
+    createIS(accessor, masterSlaveCustomResource, "MasterSlave", RebalanceMode.CUSTOMIZED);
+    createIS(accessor, onlineOfflineFullAutoResource, "OnlineOffline", RebalanceMode.FULL_AUTO);
+    createIS(accessor, masterSlaveSemiAutoInvalidResource, "MasterSlave", RebalanceMode.SEMI_AUTO);
+
+    // create some ideal state specs
+    createISSpec(accessor, masterSlaveCustomResource + "_spec", "MasterSlave",
+        RebalanceMode.CUSTOMIZED);
+    createISSpec(accessor, onlineOfflineFullAutoResource + "_spec", "OnlineOffline",
+        RebalanceMode.FULL_AUTO);
+
+    // add some state models
+    addStateModels(accessor);
+
+    // refresh the cache
+    ClusterEvent event = new ClusterEvent("testEvent");
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.refresh(accessor);
+    event.addAttribute("ClusterDataCache", cache);
+
+    // run resource computation
+    new ResourceComputationStage().process(event);
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
+    Assert.assertTrue(resourceMap.containsKey(onlineOfflineFullAutoResource));
+    Assert.assertTrue(resourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
+
+    // run resource validation
+    new ResourceValidationStage().process(event);
+    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
+    Assert.assertTrue(finalResourceMap.containsKey(onlineOfflineFullAutoResource));
+    Assert.assertFalse(finalResourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
+  }
+
+  @Test
+  public void testNoSpec() throws Exception {
+    Mocks.MockAccessor accessor = new Mocks.MockAccessor();
+
+    // create an ideal state and no spec
+    String masterSlaveCustomResource = "masterSlaveCustomResource";
+    createIS(accessor, masterSlaveCustomResource, "MasterSlave", RebalanceMode.CUSTOMIZED);
+
+    // add some state models
+    addStateModels(accessor);
+
+    // refresh the cache
+    ClusterEvent event = new ClusterEvent("testEvent");
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.refresh(accessor);
+    event.addAttribute("ClusterDataCache", cache);
+
+    // run resource computation
+    new ResourceComputationStage().process(event);
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
+
+    // run resource validation
+    new ResourceValidationStage().process(event);
+    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
+  }
+
+  @Test
+  public void testMissingStateModel() throws Exception {
+    Mocks.MockAccessor accessor = new Mocks.MockAccessor();
+
+    // create an ideal state and no spec
+    String masterSlaveCustomResource = "masterSlaveCustomResource";
+    String leaderStandbyCustomResource = "leaderStandbyCustomResource";
+    createIS(accessor, masterSlaveCustomResource, "MasterSlave", RebalanceMode.CUSTOMIZED);
+    createIS(accessor, leaderStandbyCustomResource, "LeaderStandby", RebalanceMode.CUSTOMIZED);
+
+    // add some state models (but not leader standby)
+    addStateModels(accessor);
+
+    // refresh the cache
+    ClusterEvent event = new ClusterEvent("testEvent");
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.refresh(accessor);
+    event.addAttribute("ClusterDataCache", cache);
+
+    // run resource computation
+    new ResourceComputationStage().process(event);
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
+    Assert.assertTrue(resourceMap.containsKey(leaderStandbyCustomResource));
+
+    // run resource validation
+    new ResourceValidationStage().process(event);
+    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
+    Assert.assertFalse(finalResourceMap.containsKey(leaderStandbyCustomResource));
+  }
+
+  private void createIS(HelixDataAccessor accessor, String resourceId, String stateModelDefRef,
+      RebalanceMode rebalanceMode) {
+    IdealState idealState = new IdealState(resourceId);
+    idealState.setRebalanceMode(rebalanceMode);
+    idealState.setStateModelDefRef(stateModelDefRef);
+    idealState.setNumPartitions(1);
+    idealState.setReplicas("1");
+    idealState.getRecord().setListField(resourceId + "_0", ImmutableList.of(PARTICIPANT));
+    idealState.getRecord().setMapField(resourceId + "_0", ImmutableMap.of(PARTICIPANT, STATE));
+    accessor.setProperty(accessor.keyBuilder().idealStates(resourceId), idealState);
+  }
+
+  private void createISSpec(HelixDataAccessor accessor, String specId, String stateModelDefRef,
+      RebalanceMode rebalanceMode) {
+    PropertyKey propertyKey = accessor.keyBuilder().clusterConfig();
+    HelixProperty property = accessor.getProperty(propertyKey);
+    if (property == null) {
+      property = new HelixProperty("sampleClusterConfig");
+    }
+    String key = "IdealStateRule!" + specId;
+    String value =
+        IdealStateProperty.REBALANCE_MODE.toString() + "=" + rebalanceMode.toString() + ","
+            + IdealStateProperty.STATE_MODEL_DEF_REF.toString() + "=" + stateModelDefRef;
+    property.getRecord().setSimpleField(key, value);
+    accessor.setProperty(propertyKey, property);
+  }
+
+  private void addStateModels(HelixDataAccessor accessor) {
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    accessor.setProperty(accessor.keyBuilder().stateModelDef(masterSlave.getId()), masterSlave);
+    StateModelDefinition onlineOffline =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+    accessor.setProperty(accessor.keyBuilder().stateModelDef(onlineOffline.getId()), onlineOffline);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
index e0c8b6f..0acd775 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
@@ -20,7 +20,6 @@ package org.apache.helix.integration;
  */
 
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -37,7 +36,6 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -372,67 +370,4 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
       return _clusterName;
     }
   }
-
-  /**
-   * Ensures that external view and current state are empty
-   */
-  private static class EmptyZkVerifier implements ZkVerifier {
-    private final String _clusterName;
-    private final String _resourceName;
-    private final ZkClient _zkClient;
-
-    /**
-     * Instantiate the verifier
-     * @param clusterName the cluster to verify
-     * @param resourceName the resource to verify
-     */
-    public EmptyZkVerifier(String clusterName, String resourceName) {
-      _clusterName = clusterName;
-      _resourceName = resourceName;
-      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
-    }
-
-    @Override
-    public boolean verify() {
-      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
-      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
-
-      // verify external view empty
-      for (String partition : externalView.getPartitionSet()) {
-        Map<String, String> stateMap = externalView.getStateMap(partition);
-        if (stateMap != null && !stateMap.isEmpty()) {
-          LOG.error("External view not empty for " + partition);
-          return false;
-        }
-      }
-
-      // verify current state empty
-      List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
-      for (String participant : liveParticipants) {
-        List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
-        for (String sessionId : sessionIds) {
-          CurrentState currentState =
-              accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
-          Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
-          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
-            LOG.error("Current state not empty for " + participant);
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public ZkClient getZkClient() {
-      return _zkClient;
-    }
-
-    @Override
-    public String getClusterName() {
-      return _clusterName;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5937f364/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java
new file mode 100644
index 0000000..7b56f5d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidResourceRebalance.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestInvalidResourceRebalance extends ZkUnitTestBase {
+  /**
+   * Ensure that the Helix controller doesn't attempt to rebalance resources with invalid ideal
+   * states
+   */
+  @Test
+  public void testResourceRebalanceSkipped() throws Exception {
+    final int NUM_PARTICIPANTS = 2;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 2;
+    final String RESOURCE_NAME = "TestDB0";
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", RebalanceMode.SEMI_AUTO, // use SEMI_AUTO mode
+        true); // do rebalance
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // add the ideal state spec (prevents non-CUSTOMIZED MasterSlave ideal states)
+    HelixAdmin helixAdmin = controller.getClusterManagmentTool();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("IdealStateRule!sampleRuleName",
+        "IDEAL_STATE_MODE=CUSTOMIZED,STATE_MODEL_DEF_REF=MasterSlave");
+    helixAdmin.setConfig(
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build(),
+        properties);
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      final String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    Thread.sleep(1000);
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new EmptyZkVerifier(clusterName, RESOURCE_NAME));
+    Assert.assertTrue(result, "External view and current state must be empty");
+
+    // cleanup
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      participants[i].syncStop();
+    }
+    controller.syncStop();
+  }
+
+}


[2/2] git commit: Merge branch 'helix-0.6.2-release' of https://git-wip-us.apache.org/repos/asf/incubator-helix into helix-0.6.2-release

Posted by ka...@apache.org.
Merge branch 'helix-0.6.2-release' of https://git-wip-us.apache.org/repos/asf/incubator-helix into helix-0.6.2-release


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/af56823c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/af56823c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/af56823c

Branch: refs/heads/helix-0.6.2-release
Commit: af56823c39f1a02deb9a2d63e4132b3668e2da62
Parents: 5937f36 d8a618e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Dec 12 14:03:45 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Dec 12 14:03:45 2013 -0800

----------------------------------------------------------------------
 .../helix-admin-webapp-0.6.3-incubating-SNAPSHOT.ivy              | 3 +--
 helix-core/helix-core-0.6.3-incubating-SNAPSHOT.ivy               | 3 +--
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------