You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/29 17:43:38 UTC
git commit: [HELIX-215] YAML Cluster Setup
Updated Branches:
refs/heads/helix-0.6.2-release 2045ad76e -> e22db5c1c
[HELIX-215] YAML Cluster Setup
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e22db5c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e22db5c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e22db5c1
Branch: refs/heads/helix-0.6.2-release
Commit: e22db5c1cfce56f62504943b402bda36fc2f7a5c
Parents: 2045ad7
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 29 09:43:11 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 29 09:43:11 2013 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 9 +
.../apache/helix/tools/YAMLClusterSetup.java | 287 +++++++++++++++++++
pom.xml | 10 +
3 files changed, 306 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e22db5c1/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index d4a8bad..0c42af8 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -146,6 +146,11 @@ under the License.
<artifactId>guava</artifactId>
<version>r09</version>
</dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.12</version>
+ </dependency>
</dependencies>
<build>
<resources>
@@ -217,6 +222,10 @@ under the License.
<mainClass>org.apache.helix.tools.JmxDumper</mainClass>
<name>JmxDumper</name>
</program>
+ <program>
+ <mainClass>org.apache.helix.tools.YAMLClusterSetup</mainClass>
+ <name>yaml-cluster-setup</name>
+ </program>
</programs>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e22db5c1/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
new file mode 100644
index 0000000..c7233ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
@@ -0,0 +1,287 @@
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ParticipantConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.ConstraintsConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.StateModelConfig;
+import org.apache.log4j.Logger;
+import org.yaml.snakeyaml.Yaml;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Supports HelixAdmin operations specified by a YAML configuration file defining a cluster,
+ * resources, participants, etc.
+ * See the user-rebalanced-lock-manager recipe for an annotated example file.
+ */
+public class YAMLClusterSetup {
+ private static final Logger LOG = Logger.getLogger(YAMLClusterSetup.class);
+
+ private final String _zkAddress;
+
+ /**
+ * Start the YAML parser for a given zookeeper instance
+ * @param zkAddress
+ */
+ public YAMLClusterSetup(String zkAddress) {
+ _zkAddress = zkAddress;
+ }
+
+ /**
+ * Set up the cluster by parsing a YAML file.
+ * @param input InputStream representing the file
+ * @return ClusterConfig Java wrapper of the configuration file
+ */
+ public YAMLClusterConfig setupCluster(InputStream input) {
+ // parse the YAML
+ Yaml yaml = new Yaml();
+ YAMLClusterConfig cfg = yaml.loadAs(input, YAMLClusterConfig.class);
+
+ // create the cluster
+ HelixAdmin helixAdmin = new ZKHelixAdmin(_zkAddress);
+ if (cfg.clusterName == null) {
+ throw new HelixException("Cluster name is required!");
+ }
+ helixAdmin.addCluster(cfg.clusterName);
+
+ // add each participant
+ if (cfg.participants != null) {
+ for (ParticipantConfig participant : cfg.participants) {
+ helixAdmin.addInstance(cfg.clusterName, getInstanceCfg(participant));
+ }
+ }
+
+ // add each resource
+ if (cfg.resources != null) {
+ for (ResourceConfig resource : cfg.resources) {
+ if (resource.name == null) {
+ throw new HelixException("Resources must be named!");
+ }
+ if (resource.stateModel == null || resource.stateModel.name == null) {
+ throw new HelixException("Resource must specify a named state model!");
+ }
+ // if states is null, assume using a built-in or already-added state model
+ if (resource.stateModel.states != null) {
+ StateModelDefinition stateModelDef =
+ getStateModelDef(resource.stateModel, resource.constraints);
+ helixAdmin.addStateModelDef(cfg.clusterName, resource.stateModel.name, stateModelDef);
+ }
+ int partitions = 1;
+ int replicas = 1;
+ if (resource.partitions != null) {
+ if (resource.partitions.containsKey("count")) {
+ partitions = resource.partitions.get("count");
+ }
+ if (resource.partitions.containsKey("replicas")) {
+ replicas = resource.partitions.get("replicas");
+ }
+ }
+
+ if (resource.rebalancer == null || !resource.rebalancer.containsKey("mode")) {
+ throw new HelixException("Rebalance mode is required!");
+ }
+ helixAdmin.addResource(cfg.clusterName, resource.name, partitions,
+ resource.stateModel.name, resource.rebalancer.get("mode"));
+ // user-defined rebalancer
+ if (resource.rebalancer.containsKey("class")
+ && resource.rebalancer.get("mode").equals(RebalanceMode.USER_DEFINED.toString())) {
+ IdealState idealState = helixAdmin.getResourceIdealState(cfg.clusterName, resource.name);
+ idealState.setRebalancerClassName(resource.rebalancer.get("class"));
+ helixAdmin.setResourceIdealState(cfg.clusterName, resource.name, idealState);
+ }
+ helixAdmin.rebalance(cfg.clusterName, resource.name, replicas);
+ }
+ }
+ return cfg;
+ }
+
+ private static InstanceConfig getInstanceCfg(ParticipantConfig participant) {
+ if (participant == null || participant.name == null || participant.host == null
+ || participant.port == null) {
+ throw new HelixException("Participant must have a specified name, host, and port!");
+ }
+ InstanceConfig instanceCfg = new InstanceConfig(participant.name);
+ instanceCfg.setHostName(participant.host);
+ instanceCfg.setPort(participant.port.toString());
+ return instanceCfg;
+ }
+
+ private static StateModelDefinition getStateModelDef(StateModelConfig stateModel,
+ ConstraintsConfig constraints) {
+ // Use a builder to define the state model
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(stateModel.name);
+ if (stateModel.states == null || stateModel.states.size() == 0) {
+ throw new HelixException("List of states are required in a state model!");
+ }
+ Set<String> stateSet = new HashSet<String>(stateModel.states);
+ if (stateModel.initialState == null) {
+ throw new HelixException("Initial state is required in a state model!");
+ } else if (!stateSet.contains(stateModel.initialState)) {
+ throw new HelixException("Initial state is not a valid state");
+ }
+ builder.initialState(stateModel.initialState);
+
+ // Build a helper for state priorities
+ Map<String, Integer> statePriorities = new HashMap<String, Integer>();
+ if (constraints != null && constraints.state != null && constraints.state.priorityList != null) {
+ int statePriority = 0;
+ for (String state : constraints.state.priorityList) {
+ if (!stateSet.contains(state)) {
+ throw new HelixException("State " + state
+ + " in the state priority list is not in the state list!");
+ }
+ statePriorities.put(state, statePriority);
+ statePriority++;
+ }
+ }
+
+ // Add states, set state priorities
+ for (String state : stateModel.states) {
+ if (statePriorities.containsKey(state)) {
+ builder.addState(state, statePriorities.get(state));
+ } else {
+ builder.addState(state);
+ }
+ }
+
+ // Set state counts
+ for (Map<String, String> counts : constraints.state.counts) {
+ String state = counts.get("name");
+ if (!stateSet.contains(state)) {
+ throw new HelixException("State " + state + " has a count, but not in the state list!");
+ }
+ builder.dynamicUpperBound(state, counts.get("count"));
+ }
+
+ // Build a helper for transition priorities
+ Map<String, Integer> transitionPriorities = new HashMap<String, Integer>();
+ if (constraints != null && constraints.transition != null
+ && constraints.transition.priorityList != null) {
+ int transitionPriority = 0;
+ for (String transition : constraints.transition.priorityList) {
+ transitionPriorities.put(transition, transitionPriority);
+ transitionPriority++;
+ }
+ }
+
+ // Add the transitions
+ if (stateModel.transitions == null || stateModel.transitions.size() == 0) {
+ throw new HelixException("Transitions are required!");
+ }
+ for (Map<String, String> transitions : stateModel.transitions) {
+ String name = transitions.get("name");
+ String from = transitions.get("from");
+ String to = transitions.get("to");
+ if (name == null || from == null || to == null) {
+ throw new HelixException("All transitions must have a name, a from state, and a to state");
+ }
+ if (transitionPriorities.containsKey(name)) {
+ builder.addTransition(from, to, transitionPriorities.get(name));
+ } else {
+ builder.addTransition(from, to);
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Java wrapper for the YAML input file
+ */
+ public static class YAMLClusterConfig {
+ public String clusterName;
+ public List<ResourceConfig> resources;
+ public List<ParticipantConfig> participants;
+
+ public static class ResourceConfig {
+ public String name;
+ public Map<String, String> rebalancer;
+ public Map<String, Integer> partitions;
+ public StateModelConfig stateModel;
+ public ConstraintsConfig constraints;
+
+ public static class StateModelConfig {
+ public String name;
+ public List<String> states;
+ public List<Map<String, String>> transitions;
+ public String initialState;
+ }
+
+ public static class ConstraintsConfig {
+ public StateConstraintsConfig state;
+ public TransitionConstraintsConfig transition;
+
+ public static class StateConstraintsConfig {
+ public List<Map<String, String>> counts;
+ public List<String> priorityList;
+ }
+
+ public static class TransitionConstraintsConfig {
+ public List<String> priorityList;
+ }
+ }
+ }
+
+ public static class ParticipantConfig {
+ public String name;
+ public String host;
+ public Integer port;
+ }
+ }
+
+ /**
+ * Start a cluster defined by a YAML file
+ * @param args zkAddr, yamlFile
+ */
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ LOG.error("USAGE: YAMLClusterSetup zkAddr yamlFile");
+ return;
+ }
+ String zkAddress = args[0];
+ String yamlFile = args[1];
+
+ InputStream input;
+ try {
+ input = new FileInputStream(new File(yamlFile));
+ } catch (FileNotFoundException e) {
+ LOG.error("Could not open " + yamlFile);
+ return;
+ }
+ new YAMLClusterSetup(zkAddress).setupCluster(input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e22db5c1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d596f2c..0ec8886 100644
--- a/pom.xml
+++ b/pom.xml
@@ -163,6 +163,11 @@ under the License.
<enabled>false</enabled>
</snapshots>
</repository>
+ <repository>
+ <id>Sonatype-public</id>
+ <name>SnakeYAML repository</name>
+ <url>http://oss.sonatype.org/content/groups/public/</url>
+ </repository>
</repositories>
@@ -284,6 +289,11 @@ under the License.
<artifactId>testng</artifactId>
<version>6.0.1</version>
</dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.12</version>
+ </dependency>
</dependencies>
</dependencyManagement>