You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 19:35:20 UTC

[06/50] [abbrv] storm git commit: implement/document environment variable substitution; document command line switch pass-through;

implement/document environment variable substitution; document command line switch pass-through;


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/44d5b5be
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/44d5b5be
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/44d5b5be

Branch: refs/heads/master
Commit: 44d5b5be2dc0e2334fa1e4bd5934eb1f57985084
Parents: d8d6cfa
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Mar 31 00:16:37 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Mar 31 00:16:37 2015 -0400

----------------------------------------------------------------------
 README.md                                       | 35 ++++++++++++++--
 .../main/java/org/apache/storm/flux/Flux.java   | 15 +++++--
 .../apache/storm/flux/parser/FluxParser.java    | 35 +++++++++++-----
 .../java/org/apache/storm/flux/TCKTest.java     | 43 ++++++++++++--------
 .../resources/configs/substitution-test.yaml    |  2 +
 5 files changed, 97 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ebf9d5a..71403e0 100644
--- a/README.md
+++ b/README.md
@@ -149,9 +149,15 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
  -d,--dry-run                 Do not run or deploy the topology. Just
                               build, validate, and print information about
                               the topology.
- -f,--filter <file>           Use the specified file as a source of
-                              properties, and perform variable
-                              substitution.
+ -e,--env-filter              Perform environment variable substitution.
+                              Replace keysidentified with `${ENV-[NAME]}`
+                              will be replaced with the corresponding
+                              `NAME` environment value
+ -f,--filter <file>           Perform property substitution. Use the
+                              specified file as a source of properties,
+                              and replace keys identified with {$[property
+                              name]} with the value defined in the
+                              properties file.
  -i,--inactive                Deploy the topology, but do not activate it.
  -l,--local                   Run the topology in local mode.
  -n,--no-splash               Suppress the printing of the splash screen.
@@ -167,6 +173,16 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
                               instead of the in-process ZooKeeper.
 ```
 
+**NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
+switches to pass through to the `storm` command.
+
+For example, you can use the `storm` command switch `-c` to override a topology configuration property. The following
+example command will run Flux and override the `nimus.host` configuration:
+
+```bash
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
+```
+
 ### Sample output
 ```
 ███████╗██╗     ██╗   ██╗██╗  ██╗
@@ -281,6 +297,13 @@ You would then be able to reference those properties by key in your `.yaml` file
 
 In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents.
 
+### Environment Variable Substitution/Filtering
+Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined,
+you can reference it in a Flux YAML file with the following syntax:
+
+```
+${ENV-ZK_HOSTS}
+```
 
 ## Components
 Components are essentially named object instances that are made available as configuration options for spouts and
@@ -772,6 +795,12 @@ topologySource:
   methodName: "getTopologyWithDifferentMethodName"
 ```
 
+## Author
+P. Taylor Goetz
+
+## Contributors
+
+
 ## Contributing
 
 Contributions in any form are more than welcome.

http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index dcd3953..2c2105c 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -55,6 +55,7 @@ public class Flux {
     private static final String OPTION_INACTIVE = "inactive";
     private static final String OPTION_ZOOKEEPER = "zookeeper";
     private static final String OPTION_FILTER = "filter";
+    private static final String OPTION_ENV_FILTER = "env-filter";
 
     public static void main(String[] args) throws Exception {
         Options options = new Options();
@@ -80,8 +81,12 @@ public class Flux {
         options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the " +
                 "specified <host>:<port> instead of the in-process ZooKeeper."));
 
-        options.addOption(option(1, "f", OPTION_FILTER, "file", "Use the specified file as a source of properties, and " +
-                "perform variable substitution."));
+        options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file " +
+                "as a source of properties, and replace keys identified with {$[property name]} with the value defined " +
+                "in the properties file."));
+
+        options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys" +
+                "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
 
         CommandLineParser parser = new BasicParser();
         CommandLine cmd = parser.parse(options, args);
@@ -129,13 +134,15 @@ public class Flux {
             filterProps = cmd.getOptionValue(OPTION_FILTER);
         }
 
+
+        boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
         if(cmd.hasOption(OPTION_RESOURCE)){
             printf("Parsing classpath resource: %s", filePath);
-            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps);
+            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
         } else {
             printf("Parsing file: %s",
                     new File(filePath).getAbsolutePath());
-            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps);
+            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
         }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 109330d..78c52d5 100644
--- a/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -43,46 +43,49 @@ public class FluxParser {
 
     // TODO refactor input stream processing (see parseResource() method).
     public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
-                                        String propertiesFile) throws IOException {
+                                        String propertiesFile, boolean envSub) throws IOException {
         Yaml yaml = yaml();
         FileInputStream in = new FileInputStream(inputFile);
         // TODO process properties, etc.
-        TopologyDef topology = loadYaml(yaml, in, propertiesFile);
+        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
         in.close();
         if(dumpYaml){
             dumpYaml(topology, yaml);
         }
         if(processIncludes) {
-            return processIncludes(yaml, topology, propertiesFile);
+            return processIncludes(yaml, topology, propertiesFile, envSub);
         } else {
             return topology;
         }
     }
 
     public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
-                                            String propertiesFile) throws IOException {
+                                            String propertiesFile, boolean envSub) throws IOException {
         Yaml yaml = yaml();
         InputStream in = FluxParser.class.getResourceAsStream(resource);
-        TopologyDef topology = loadYaml(yaml, in, propertiesFile);
+        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
         in.close();
         if(dumpYaml){
             dumpYaml(topology, yaml);
         }
         if(processIncludes) {
-            return processIncludes(yaml, topology, propertiesFile);
+            return processIncludes(yaml, topology, propertiesFile, envSub);
         } else {
             return topology;
         }
     }
 
-    private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile) throws IOException {
+    private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean envSubstitution) throws IOException {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         LOG.info("loading YAML from input stream...");
         int b = -1;
         while((b = in.read()) != -1){
             bos.write(b);
         }
+
+        // TODO substitution implementation is not exactly efficient or kind to memory...
         String str = bos.toString();
+        // properties file substitution
         if(propsFile != null){
             LOG.info("Performing property substitution.");
             InputStream propsIn = new FileInputStream(propsFile);
@@ -94,6 +97,17 @@ public class FluxParser {
         } else {
             LOG.info("Not performing property substitution.");
         }
+
+        // environment variable substitution
+        if(envSubstitution){
+            LOG.info("Performing environment variable substitution...");
+            Map<String, String> envs = System.getenv();
+            for(String key : envs.keySet()){
+                str = str.replace("${ENV-" + key + "}", envs.get(key));
+            }
+        } else {
+            LOG.info("Not performing environment variable substitution.");
+        }
         return (TopologyDef)yaml.load(str);
     }
 
@@ -120,17 +134,18 @@ public class FluxParser {
      * @param topologyDef the topology definition containing (possibly zero) includes
      * @return The TopologyDef with includes resolved.
      */
-    private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile) throws IOException {
+    private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, String propsFile, boolean envSub)
+            throws IOException {
         //TODO support multiple levels of includes
         if(topologyDef.getIncludes() != null) {
             for (IncludeDef include : topologyDef.getIncludes()){
                 TopologyDef includeTopologyDef = null;
                 if (include.isResource()) {
                     LOG.info("Loading includes from resource: {}", include.getFile());
-                    includeTopologyDef = parseResource(include.getFile(), true, false, propsFile);
+                    includeTopologyDef = parseResource(include.getFile(), true, false, propsFile, envSub);
                 } else {
                     LOG.info("Loading includes from file: {}", include.getFile());
-                    includeTopologyDef = parseFile(include.getFile(), true, false, propsFile);
+                    includeTopologyDef = parseFile(include.getFile(), true, false, propsFile, envSub);
                 }
 
                 // if overrides are disabled, we won't replace anything that already exists

http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index f6076cc..6580ef7 100644
--- a/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.*;
 public class TCKTest {
     @Test
     public void testTCK() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -42,7 +42,7 @@ public class TCKTest {
 
     @Test
     public void testShellComponents() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -52,7 +52,7 @@ public class TCKTest {
 
     @Test
     public void testKafkaSpoutConfig() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -62,7 +62,7 @@ public class TCKTest {
 
     @Test
     public void testLoadFromResource() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -73,7 +73,7 @@ public class TCKTest {
 
     @Test
     public void testHdfs() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -83,7 +83,7 @@ public class TCKTest {
 
     @Test
     public void testIncludes() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
         StormTopology topology = FluxBuilder.buildTopology(context);
@@ -96,7 +96,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySource() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -107,7 +107,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithReflection() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -118,7 +118,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithConfigParam() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -129,7 +129,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithMethodName() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -141,7 +141,7 @@ public class TCKTest {
 
     @Test
     public void testTridentTopologySource() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -152,7 +152,7 @@ public class TCKTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testInvalidTopologySource() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false);
         assertFalse("Topology config is invalid.", topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -162,7 +162,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithGetMethodName() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -173,7 +173,7 @@ public class TCKTest {
 
     @Test
     public void testTopologySourceWithConfigMethods() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null);
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -190,7 +190,7 @@ public class TCKTest {
 
     @Test
     public void testVariableSubstitution() throws Exception {
-        TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties");
+        TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true);
         assertTrue(topologyDef.validate());
         Config conf = FluxBuilder.buildConfig(topologyDef);
         ExecutionContext context = new ExecutionContext(topologyDef, conf);
@@ -198,6 +198,17 @@ public class TCKTest {
         assertNotNull(topology);
         topology.validate();
 
-        assertTrue(context.getTopologyDef().getName().equals("substitution-topology"));
+        // test basic substitution
+        assertEquals("Property not replaced.",
+                "substitution-topology",
+                context.getTopologyDef().getName());
+
+        // test environment variable substitution
+        // $PATH should be defined on most systems
+        String envPath = System.getenv().get("PATH");
+        assertEquals("ENV variable not replaced.",
+                envPath,
+                context.getTopologyDef().getConfig().get("test.env.value"));
+
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/44d5b5be/flux-core/src/test/resources/configs/substitution-test.yaml
----------------------------------------------------------------------
diff --git a/flux-core/src/test/resources/configs/substitution-test.yaml b/flux-core/src/test/resources/configs/substitution-test.yaml
index cbfeea4..13f1960 100644
--- a/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -41,6 +41,8 @@ name: "${topology.name}"
 #
 config:
   topology.workers: 1
+  # test environent variable substitution
+  test.env.value: ${ENV-PATH}
   # ...
 
 # spout definitions