You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/29 17:43:38 UTC

git commit: [HELIX-215] YAML Cluster Setup

Updated Branches:
  refs/heads/helix-0.6.2-release 2045ad76e -> e22db5c1c


[HELIX-215] YAML Cluster Setup


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e22db5c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e22db5c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e22db5c1

Branch: refs/heads/helix-0.6.2-release
Commit: e22db5c1cfce56f62504943b402bda36fc2f7a5c
Parents: 2045ad7
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 29 09:43:11 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 29 09:43:11 2013 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   9 +
 .../apache/helix/tools/YAMLClusterSetup.java    | 287 +++++++++++++++++++
 pom.xml                                         |  10 +
 3 files changed, 306 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e22db5c1/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index d4a8bad..0c42af8 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -146,6 +146,11 @@ under the License.
       <artifactId>guava</artifactId>
       <version>r09</version>
     </dependency>
+      <dependency>
+        <groupId>org.yaml</groupId>
+        <artifactId>snakeyaml</artifactId>
+        <version>1.12</version>
+      </dependency>
   </dependencies>
   <build>
     <resources>
@@ -217,6 +222,10 @@ under the License.
               <mainClass>org.apache.helix.tools.JmxDumper</mainClass>
               <name>JmxDumper</name>
             </program>
+            <program>
+              <mainClass>org.apache.helix.tools.YAMLClusterSetup</mainClass>
+              <name>yaml-cluster-setup</name>
+            </program>
           </programs>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e22db5c1/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
new file mode 100644
index 0000000..c7233ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
@@ -0,0 +1,287 @@
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ParticipantConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.ConstraintsConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.StateModelConfig;
+import org.apache.log4j.Logger;
+import org.yaml.snakeyaml.Yaml;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Supports HelixAdmin operations specified by a YAML configuration file defining a cluster,
+ * resources, participants, etc.
+ * See the user-rebalanced-lock-manager recipe for an annotated example file.
+ */
+public class YAMLClusterSetup {
+  private static final Logger LOG = Logger.getLogger(YAMLClusterSetup.class);
+
+  private final String _zkAddress;
+
+  /**
+   * Start the YAML parser for a given zookeeper instance
+   * @param zkAddress
+   */
+  public YAMLClusterSetup(String zkAddress) {
+    _zkAddress = zkAddress;
+  }
+
+  /**
+   * Set up the cluster by parsing a YAML file.
+   * @param input InputStream representing the file
+   * @return ClusterConfig Java wrapper of the configuration file
+   */
+  public YAMLClusterConfig setupCluster(InputStream input) {
+    // parse the YAML
+    Yaml yaml = new Yaml();
+    YAMLClusterConfig cfg = yaml.loadAs(input, YAMLClusterConfig.class);
+
+    // create the cluster
+    HelixAdmin helixAdmin = new ZKHelixAdmin(_zkAddress);
+    if (cfg.clusterName == null) {
+      throw new HelixException("Cluster name is required!");
+    }
+    helixAdmin.addCluster(cfg.clusterName);
+
+    // add each participant
+    if (cfg.participants != null) {
+      for (ParticipantConfig participant : cfg.participants) {
+        helixAdmin.addInstance(cfg.clusterName, getInstanceCfg(participant));
+      }
+    }
+
+    // add each resource
+    if (cfg.resources != null) {
+      for (ResourceConfig resource : cfg.resources) {
+        if (resource.name == null) {
+          throw new HelixException("Resources must be named!");
+        }
+        if (resource.stateModel == null || resource.stateModel.name == null) {
+          throw new HelixException("Resource must specify a named state model!");
+        }
+        // if states is null, assume using a built-in or already-added state model
+        if (resource.stateModel.states != null) {
+          StateModelDefinition stateModelDef =
+              getStateModelDef(resource.stateModel, resource.constraints);
+          helixAdmin.addStateModelDef(cfg.clusterName, resource.stateModel.name, stateModelDef);
+        }
+        int partitions = 1;
+        int replicas = 1;
+        if (resource.partitions != null) {
+          if (resource.partitions.containsKey("count")) {
+            partitions = resource.partitions.get("count");
+          }
+          if (resource.partitions.containsKey("replicas")) {
+            replicas = resource.partitions.get("replicas");
+          }
+        }
+
+        if (resource.rebalancer == null || !resource.rebalancer.containsKey("mode")) {
+          throw new HelixException("Rebalance mode is required!");
+        }
+        helixAdmin.addResource(cfg.clusterName, resource.name, partitions,
+            resource.stateModel.name, resource.rebalancer.get("mode"));
+        // user-defined rebalancer
+        if (resource.rebalancer.containsKey("class")
+            && resource.rebalancer.get("mode").equals(RebalanceMode.USER_DEFINED.toString())) {
+          IdealState idealState = helixAdmin.getResourceIdealState(cfg.clusterName, resource.name);
+          idealState.setRebalancerClassName(resource.rebalancer.get("class"));
+          helixAdmin.setResourceIdealState(cfg.clusterName, resource.name, idealState);
+        }
+        helixAdmin.rebalance(cfg.clusterName, resource.name, replicas);
+      }
+    }
+    return cfg;
+  }
+
+  private static InstanceConfig getInstanceCfg(ParticipantConfig participant) {
+    if (participant == null || participant.name == null || participant.host == null
+        || participant.port == null) {
+      throw new HelixException("Participant must have a specified name, host, and port!");
+    }
+    InstanceConfig instanceCfg = new InstanceConfig(participant.name);
+    instanceCfg.setHostName(participant.host);
+    instanceCfg.setPort(participant.port.toString());
+    return instanceCfg;
+  }
+
+  private static StateModelDefinition getStateModelDef(StateModelConfig stateModel,
+      ConstraintsConfig constraints) {
+    // Use a builder to define the state model
+    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(stateModel.name);
+    if (stateModel.states == null || stateModel.states.size() == 0) {
+      throw new HelixException("List of states are required in a state model!");
+    }
+    Set<String> stateSet = new HashSet<String>(stateModel.states);
+    if (stateModel.initialState == null) {
+      throw new HelixException("Initial state is required in a state model!");
+    } else if (!stateSet.contains(stateModel.initialState)) {
+      throw new HelixException("Initial state is not a valid state");
+    }
+    builder.initialState(stateModel.initialState);
+
+    // Build a helper for state priorities
+    Map<String, Integer> statePriorities = new HashMap<String, Integer>();
+    if (constraints != null && constraints.state != null && constraints.state.priorityList != null) {
+      int statePriority = 0;
+      for (String state : constraints.state.priorityList) {
+        if (!stateSet.contains(state)) {
+          throw new HelixException("State " + state
+              + " in the state priority list is not in the state list!");
+        }
+        statePriorities.put(state, statePriority);
+        statePriority++;
+      }
+    }
+
+    // Add states, set state priorities
+    for (String state : stateModel.states) {
+      if (statePriorities.containsKey(state)) {
+        builder.addState(state, statePriorities.get(state));
+      } else {
+        builder.addState(state);
+      }
+    }
+
+    // Set state counts
+    for (Map<String, String> counts : constraints.state.counts) {
+      String state = counts.get("name");
+      if (!stateSet.contains(state)) {
+        throw new HelixException("State " + state + " has a count, but not in the state list!");
+      }
+      builder.dynamicUpperBound(state, counts.get("count"));
+    }
+
+    // Build a helper for transition priorities
+    Map<String, Integer> transitionPriorities = new HashMap<String, Integer>();
+    if (constraints != null && constraints.transition != null
+        && constraints.transition.priorityList != null) {
+      int transitionPriority = 0;
+      for (String transition : constraints.transition.priorityList) {
+        transitionPriorities.put(transition, transitionPriority);
+        transitionPriority++;
+      }
+    }
+
+    // Add the transitions
+    if (stateModel.transitions == null || stateModel.transitions.size() == 0) {
+      throw new HelixException("Transitions are required!");
+    }
+    for (Map<String, String> transitions : stateModel.transitions) {
+      String name = transitions.get("name");
+      String from = transitions.get("from");
+      String to = transitions.get("to");
+      if (name == null || from == null || to == null) {
+        throw new HelixException("All transitions must have a name, a from state, and a to state");
+      }
+      if (transitionPriorities.containsKey(name)) {
+        builder.addTransition(from, to, transitionPriorities.get(name));
+      } else {
+        builder.addTransition(from, to);
+      }
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Java wrapper for the YAML input file
+   */
+  public static class YAMLClusterConfig {
+    public String clusterName;
+    public List<ResourceConfig> resources;
+    public List<ParticipantConfig> participants;
+
+    public static class ResourceConfig {
+      public String name;
+      public Map<String, String> rebalancer;
+      public Map<String, Integer> partitions;
+      public StateModelConfig stateModel;
+      public ConstraintsConfig constraints;
+
+      public static class StateModelConfig {
+        public String name;
+        public List<String> states;
+        public List<Map<String, String>> transitions;
+        public String initialState;
+      }
+
+      public static class ConstraintsConfig {
+        public StateConstraintsConfig state;
+        public TransitionConstraintsConfig transition;
+
+        public static class StateConstraintsConfig {
+          public List<Map<String, String>> counts;
+          public List<String> priorityList;
+        }
+
+        public static class TransitionConstraintsConfig {
+          public List<String> priorityList;
+        }
+      }
+    }
+
+    public static class ParticipantConfig {
+      public String name;
+      public String host;
+      public Integer port;
+    }
+  }
+
+  /**
+   * Start a cluster defined by a YAML file
+   * @param args zkAddr, yamlFile
+   */
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      LOG.error("USAGE: YAMLClusterSetup zkAddr yamlFile");
+      return;
+    }
+    String zkAddress = args[0];
+    String yamlFile = args[1];
+
+    InputStream input;
+    try {
+      input = new FileInputStream(new File(yamlFile));
+    } catch (FileNotFoundException e) {
+      LOG.error("Could not open " + yamlFile);
+      return;
+    }
+    new YAMLClusterSetup(zkAddress).setupCluster(input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e22db5c1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d596f2c..0ec8886 100644
--- a/pom.xml
+++ b/pom.xml
@@ -163,6 +163,11 @@ under the License.
         <enabled>false</enabled>
       </snapshots>
     </repository>
+    <repository>
+      <id>Sonatype-public</id>
+      <name>SnakeYAML repository</name>
+      <url>http://oss.sonatype.org/content/groups/public/</url>
+    </repository>
   </repositories>
 
  
@@ -284,6 +289,11 @@ under the License.
         <artifactId>testng</artifactId>
         <version>6.0.1</version>
       </dependency>
+      <dependency>
+        <groupId>org.yaml</groupId>
+        <artifactId>snakeyaml</artifactId>
+        <version>1.12</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>