You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 19:35:20 UTC
[06/50] [abbrv] storm git commit: implement/document environment
variable substitution; document command line switch pass-through;
implement/document environment variable substitution; document command line switch pass-through;
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44d5b5be
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44d5b5be
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44d5b5be
Branch: refs/heads/master
Commit: 44d5b5be2dc0e2334fa1e4bd5934eb1f57985084
Parents: d8d6cfa
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Mar 31 00:16:37 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Mar 31 00:16:37 2015 -0400
----------------------------------------------------------------------
README.md | 35 ++++++++++++++--
.../main/java/org/apache/storm/flux/Flux.java | 15 +++++--
.../apache/storm/flux/parser/FluxParser.java | 35 +++++++++++-----
.../java/org/apache/storm/flux/TCKTest.java | 43 ++++++++++++--------
.../resources/configs/substitution-test.yaml | 2 +
5 files changed, 97 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ebf9d5a..71403e0 100644
--- a/README.md
+++ b/README.md
@@ -149,9 +149,15 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
-d,--dry-run Do not run or deploy the topology. Just
build, validate, and print information about
the topology.
- -f,--filter <file> Use the specified file as a source of
- properties, and perform variable
- substitution.
+ -e,--env-filter Perform environment variable substitution.
+ Replace keysidentified with `${ENV-[NAME]}`
+ will be replaced with the corresponding
+ `NAME` environment value
+ -f,--filter <file> Perform property substitution. Use the
+ specified file as a source of properties,
+ and replace keys identified with {$[property
+ name]} with the value defined in the
+ properties file.
-i,--inactive Deploy the topology, but do not activate it.
-l,--local Run the topology in local mode.
-n,--no-splash Suppress the printing of the splash screen.
@@ -167,6 +173,16 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
instead of the in-process ZooKeeper.
```
+**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
+switches to pass through to the `storm` command.
+
+For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
+example command will run Flux and override the `nimus.host` configuration:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
+```
+
### Sample output
```
███████╗██╗ ██╗ ██╗██╗ ██╗
@@ -281,6 +297,13 @@ You would then be able to reference those properties by key in your `.yaml` file
In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
+### Environment Variable Substitution/Filtering
+Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
+you can reference it in a Flux YAML file with the following syntax:
+
+```
+${ENV-ZK_HOSTS}
+```
## Components
Components are essentially named object instances that are made available as configuration options for spouts and
@@ -772,6 +795,12 @@ topologySource:
methodName: "getTopologyWithDifferentMethodName"
```
+## Author
+P. Taylor Goetz
+
+## Contributors
+
+
## Contributing
Contributions in any form are more than welcome.
http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index dcd3953..2c2105c 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -55,6 +55,7 @@ public class Flux {
private static final String OPTION_INACTIVE = "inactive";
private static final String OPTION_ZOOKEEPER = "zookeeper";
private static final String OPTION_FILTER = "filter";
+ private static final String OPTION_ENV_FILTER = "env-filter";
public static void main(String[] args) throws Exception {
Options options = new Options();
@@ -80,8 +81,12 @@ public class Flux {
options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " +
"specified <host>:<port> instead of the in-process ZooKeeper."));
- options.addOption(option(1, "f", OPTION_FILTER, "file", "Use the specified file as a source of properties, and " +
- "perform variable substitution."));
+ options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " +
+ "as a source of properties, and replace keys identified with {$[property name]} with the value defined " +
+ "in the properties file."));
+
+ options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" +
+ "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse(options, args);
@@ -129,13 +134,15 @@ public class Flux {
filterProps = cmd.getOptionValue(OPTION_FILTER);
}
+
+ boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
if(cmd.hasOption(OPTION_RESOURCE)){
printf("Parsing classpath resource: %s", filePath);
- topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps);
+ topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
} else {
printf("Parsing file: %s",
new File(filePath).getAbsolutePath());
- topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps);
+ topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 109330d..78c52d5 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -43,46 +43,49 @@ public class FluxParser {
// TODO refactor input stream processing (see parseResource() method).
public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
- String propertiesFile) throws IOException {
+ String propertiesFile, boolean envSub) throws IOException {
Yaml yaml = yaml();
FileInputStream in = new FileInputStream(inputFile);
// TODO process properties, etc.
- TopologyDef topology = loadYaml(yaml, in, propertiesFile);
+ TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
in.close();
if(dumpYaml){
dumpYaml(topology, yaml);
}
if(processIncludes) {
- return processIncludes(yaml, topology, propertiesFile);
+ return processIncludes(yaml, topology, propertiesFile, envSub);
} else {
return topology;
}
}
public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
- String propertiesFile) throws IOException {
+ String propertiesFile, boolean envSub) throws IOException {
Yaml yaml = yaml();
InputStream in = FluxParser.class.getResourceAsStream(resource);
- TopologyDef topology = loadYaml(yaml, in, propertiesFile);
+ TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
in.close();
if(dumpYaml){
dumpYaml(topology, yaml);
}
if(processIncludes) {
- return processIncludes(yaml, topology, propertiesFile);
+ return processIncludes(yaml, topology, propertiesFile, envSub);
} else {
return topology;
}
}
- private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile) throws IOException {
+ private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
LOG.info("loading YAML from input stream...");
int b = -1;
while((b = in.read()) != -1){
bos.write(b);
}
+
+ // TODO substitution implementation is not exactly efficient or kind to memory...
String str = bos.toString();
+ // properties file substitution
if(propsFile != null){
LOG.info("Performing property substitution.");
InputStream propsIn = new FileInputStream(propsFile);
@@ -94,6 +97,17 @@ public class FluxParser {
} else {
LOG.info("Not performing property substitution.");
}
+
+ // environment variable substitution
+ if(envSubstitution){
+ LOG.info("Performing environment variable substitution...");
+ Map<String, String> envs = System.getenv();
+ for(String key : envs.keySet()){
+ str = str.replace("${ENV-" + key + "}", envs.get(key));
+ }
+ } else {
+ LOG.info("Not performing environment variable substitution.");
+ }
return (TopologyDef)yaml.load(str);
}
@@ -120,17 +134,18 @@ public class FluxParser {
* @param topologyDef the topology definition containing (possibly zero) includes
* @return The TopologyDef with includes resolved.
*/
- private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile) throws IOException {
+ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
+ throws IOException {
//TODO support multiple levels of includes
if(topologyDef.getIncludes() != null) {
for (IncludeDef include : topologyDef.getIncludes()){
TopologyDef includeTopologyDef = null;
if (include.isResource()) {
LOG.info("Loading includes from resource: {}", include.getFile());
- includeTopologyDef = parseResource(include.getFile(), true, false, propsFile);
+ includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
} else {
LOG.info("Loading includes from file: {}", include.getFile());
- includeTopologyDef = parseFile(include.getFile(), true, false, propsFile);
+ includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
}
// if overrides are disabled, we won't replace anything that already exists
http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index f6076cc..6580ef7 100644
--- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.*;
public class TCKTest {
@Test
public void testTCK() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -42,7 +42,7 @@ public class TCKTest {
@Test
public void testShellComponents() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -52,7 +52,7 @@ public class TCKTest {
@Test
public void testKafkaSpoutConfig() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -62,7 +62,7 @@ public class TCKTest {
@Test
public void testLoadFromResource() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -73,7 +73,7 @@ public class TCKTest {
@Test
public void testHdfs() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -83,7 +83,7 @@ public class TCKTest {
@Test
public void testIncludes() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -96,7 +96,7 @@ public class TCKTest {
@Test
public void testTopologySource() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -107,7 +107,7 @@ public class TCKTest {
@Test
public void testTopologySourceWithReflection() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -118,7 +118,7 @@ public class TCKTest {
@Test
public void testTopologySourceWithConfigParam() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -129,7 +129,7 @@ public class TCKTest {
@Test
public void testTopologySourceWithMethodName() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -141,7 +141,7 @@ public class TCKTest {
@Test
public void testTridentTopologySource() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -152,7 +152,7 @@ public class TCKTest {
@Test(expected = IllegalArgumentException.class)
public void testInvalidTopologySource() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false);
assertFalse("Topology config is invalid.", topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -162,7 +162,7 @@ public class TCKTest {
@Test
public void testTopologySourceWithGetMethodName() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -173,7 +173,7 @@ public class TCKTest {
@Test
public void testTopologySourceWithConfigMethods() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null);
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -190,7 +190,7 @@ public class TCKTest {
@Test
public void testVariableSubstitution() throws Exception {
- TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties");
+ TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true);
assertTrue(topologyDef.validate());
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -198,6 +198,17 @@ public class TCKTest {
assertNotNull(topology);
topology.validate();
- assertTrue(context.getTopologyDef().getName().equals("substitution-topology"));
+ // test basic substitution
+ assertEquals("Property not replaced.",
+ "substitution-topology",
+ context.getTopologyDef().getName());
+
+ // test environment variable substitution
+ // $PATH should be defined on most systems
+ String envPath = System.getenv().get("PATH");
+ assertEquals("ENV variable not replaced.",
+ envPath,
+ context.getTopologyDef().getConfig().get("test.env.value"));
+
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/test/resources/configs/substitution-test.yaml
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/substitution-test.yaml b/flux-core/src/test/resources/configs/substitution-test.yaml
index cbfeea4..13f1960 100644
--- a/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -41,6 +41,8 @@ name: "${topology.name}"
#
config:
topology.workers: 1
+ # test environent variable substitution
+ test.env.value: ${ENV-PATH}
# ...
# spout definitions