You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/12/21 18:50:23 UTC

[1/2] storm git commit: STORM-3276: Updated Flux to deal with storm local correctly

Repository: storm
Updated Branches:
  refs/heads/master cf00a537b -> 074fb35a0


STORM-3276: Updated Flux to deal with storm local correctly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/095bdbc2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/095bdbc2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/095bdbc2

Branch: refs/heads/master
Commit: 095bdbc267fe4354c28baa84186807dcd3be2a31
Parents: cf00a53
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 15 14:12:48 2018 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Dec 18 11:21:16 2018 -0600

----------------------------------------------------------------------
 bin/storm.py                                    | 13 +++-
 flux/README.md                                  | 60 +++++-------------
 .../main/java/org/apache/storm/flux/Flux.java   | 47 +++++++-------
 .../java/org/apache/storm/LocalCluster.java     | 66 ++++++++++++++++----
 4 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/095bdbc2/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 013e141..058ada8 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -338,12 +338,16 @@ def local(jarfile, klass, *args):
     local also adds in the option --local-ttl which sets the number of seconds the
     local cluster will run for before it shuts down.
 
+    --local-zookeeper if using an external zookeeper sets the connection string to use for it.
+
     --java-debug lets you turn on java debugging and set the parameters passed to -agentlib:jdwp on the JDK
     --java-debug transport=dt_socket,address=localhost:8000
     will open up a debugging server on port 8000.
     """
-    [ttl, debug_args, args] = parse_local_opts(args)
+    [ttl, lzk, debug_args, args] = parse_local_opts(args)
     extrajvmopts = ["-Dstorm.local.sleeptime=" + ttl]
+    if lzk != None:
+        extrajvmopts = extrajvmopts + ["-Dstorm.local.zookeeper=" + lzk]
     if debug_args != None:
         extrajvmopts = extrajvmopts + ["-agentlib:jdwp=" + debug_args]
     run_client_jar(jarfile, "org.apache.storm.LocalCluster", [klass] + list(args), client=False, daemon=False, extrajvmopts=extrajvmopts)
@@ -980,19 +984,22 @@ def parse_local_opts(args):
     curr = list(args[:])
     curr.reverse()
     ttl = "20"
+    lzk = None
     debug_args = None
     args_list = []
 
     while len(curr) > 0:
         token = curr.pop()
-        if token == "--local-ttl":
+        if token == "--local-zookeeper":
+            lzk = curr.pop()
+        elif token == "--local-ttl":
             ttl = curr.pop()
         elif token == "--java-debug":
             debug_args = curr.pop()
         else:
             args_list.append(token)
 
-    return ttl, debug_args, args_list
+    return ttl, lzk, debug_args, args_list
 
 
 def parse_jar_opts(args):

http://git-wip-us.apache.org/repos/asf/storm/blob/095bdbc2/flux/README.md
----------------------------------------------------------------------
diff --git a/flux/README.md b/flux/README.md
index 58ed25d..47bc7d6 100644
--- a/flux/README.md
+++ b/flux/README.md
@@ -17,36 +17,7 @@ order to change configuration.
 Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
 deveoper-intensive.
 
-Have you ever found yourself repeating this pattern?:
-
-```java
-
-public static void main(String[] args) throws Exception {
-    // logic to determine if we're running locally or not...
-    // create necessary config options...
-    boolean runLocal = shouldRunLocal();
-    if(runLocal){
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(name, conf, topology);
-    } else {
-        StormSubmitter.submitTopology(name, conf, topology);
-    }
-}
-```
-
-Wouldn't something like this be easier:
-
-```bash
-storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
-```
-
-or:
-
-```bash
-storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
-```
-
-Another pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
+A Major pain point often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
 and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
 pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
 the layout and configuration of your topologies.
@@ -202,13 +173,13 @@ The example below illustrates Flux usage with the Maven shade plugin:
  ```
 
 ### Deploying and Running a Flux Topology
-Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
-or remotely using the `storm jar` command. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
+Once your topology components are packaged with the Flux dependency, you can run different topologies either locally using the `storm local` command
+or remotely using `storm jar`. For example, if your fat jar is named `myTopology-0.1.0-SNAPSHOT.jar` you
 could run it locally with the command:
 
 
 ```bash
-storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
+storm local myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux my_config.yaml
 
 ```
 
@@ -228,20 +199,22 @@ usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
                               and replace keys identified with {$[property
                               name]} with the value defined in the
                               properties file.
+ -h,--help                    Print this help message
  -i,--inactive                Deploy the topology, but do not activate it.
- -l,--local                   Run the topology in local mode.
+ -l,--local                   Ignored: to run in local mode use `storm
+                              local` instead of `storm jar`
  -n,--no-splash               Suppress the printing of the splash screen.
  -q,--no-detail               Suppress the printing of topology details.
- -r,--remote                  Deploy the topology to a remote cluster.
+ -r,--remote                  Ignored: to run on a remote cluster launch
+                              using `storm jar` to run in a local cluster
+                              use `storm local`
  -R,--resource                Treat the supplied path as a classpath
                               resource instead of a file.
- -s,--sleep <ms>              When running locally, the amount of time to
-                              sleep (in ms.) before killing the topology
-                              and shutting down the local cluster.
- -z,--zookeeper <host:port>   When running in local mode, use the
-                              ZooKeeper at the specified <host>:<port>
-                              instead of the in-process ZooKeeper.
-                              (requires Storm 0.9.3 or later)
+ -s,--sleep <ms>              Ignored: to set cluster run time use
+                              `--local-ttl` with `storm local` instead.
+ -z,--zookeeper <host:port>   Ignored, if you want to set the zookeeper
+                              host/port in local mode use
+                              `--local-zookeeper` instead
 ```
 
 **NOTE:** Flux tries to avoid command line switch collision with the `storm` command, and allows any other command line
@@ -251,7 +224,7 @@ For example, you can use the `storm` command switch `-c` to override a topology
 example command will run Flux and override the `nimus.host` configuration:
 
 ```bash
-storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c nimbus.host=localhost
+storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux my_config.yaml -c nimbus.host=localhost
 ```
 
 ### Sample output
@@ -279,7 +252,6 @@ sentence-spout --SHUFFLE--> splitsentence
 splitsentence --FIELDS--> count
 count --SHUFFLE--> log
 --------------------------------------
-Submitting topology: 'shell-topology' to remote cluster...
 ```
 
 ## YAML Configuration

http://git-wip-us.apache.org/repos/asf/storm/blob/095bdbc2/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index b801a52..15f492d 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -52,17 +52,22 @@ import org.slf4j.LoggerFactory;
 public class Flux {
     private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
 
+    @Deprecated
     private static final String OPTION_LOCAL = "local";
+    @Deprecated
     private static final String OPTION_REMOTE = "remote";
     private static final String OPTION_RESOURCE = "resource";
+    @Deprecated
     private static final String OPTION_SLEEP = "sleep";
     private static final String OPTION_DRY_RUN = "dry-run";
     private static final String OPTION_NO_DETAIL = "no-detail";
     private static final String OPTION_NO_SPLASH = "no-splash";
     private static final String OPTION_INACTIVE = "inactive";
+    @Deprecated
     private static final String OPTION_ZOOKEEPER = "zookeeper";
     private static final String OPTION_FILTER = "filter";
     private static final String OPTION_ENV_FILTER = "env-filter";
+    private static final String OPTION_HELP = "help";
 
     /**
      * Flux main entry point.
@@ -72,14 +77,18 @@ public class Flux {
     public static void main(String[] args) throws Exception {
         Options options = new Options();
 
-        options.addOption(option(0, "l", OPTION_LOCAL, "Run the topology in local mode."));
+        options.addOption(option(0, "h", OPTION_HELP, "Print this help message"));
 
-        options.addOption(option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
+        options.addOption(option(0, "l", OPTION_LOCAL, "Ignored: to run in local mode use `storm local`"
+            + " instead of `storm jar`"));
+
+        options.addOption(option(0, "r", OPTION_REMOTE, "Ignored: to run on a remote cluster launch"
+            + " using `storm jar` to run in a local cluster use `storm local`"));
 
         options.addOption(option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file."));
 
-        options.addOption(option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) "
-                + "before killing the topology and shutting down the local cluster."));
+        options.addOption(option(1, "s", OPTION_SLEEP, "ms", "Ignored: to set cluster run time"
+            + " use `--local-ttl` with `storm local` instead."));
 
         options.addOption(option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, "
                 + "and print information about the topology."));
@@ -90,20 +99,20 @@ public class Flux {
 
         options.addOption(option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
 
-        options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the "
-                + "specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
+        options.addOption(option(1, "z", OPTION_ZOOKEEPER, "host:port", "Ignored, if you want to"
+            + " set the zookeeper host/port in local mode use `--local-zookeeper` instead"));
 
         options.addOption(option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file "
                 + "as a source of properties, and replace keys identified with {$[property name]} with the value defined "
                 + "in the properties file."));
 
         options.addOption(option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keys"
-                + "identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
+                + " identified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
 
         CommandLineParser parser = new BasicParser();
         CommandLine cmd = parser.parse(options, args);
 
-        if (cmd.getArgs().length != 1) {
+        if (cmd.getArgs().length != 1 || cmd.hasOption(OPTION_HELP)) {
             usage(options);
             System.exit(1);
         }
@@ -169,22 +178,16 @@ public class Flux {
         }
 
         if (!cmd.hasOption(OPTION_DRY_RUN)) {
-            if (cmd.hasOption(OPTION_REMOTE)) {
-                LOG.info("Running remotely...");
-                // should the topology be active or inactive
-                SubmitOptions submitOptions = null;
-                if (cmd.hasOption(OPTION_INACTIVE)) {
-                    LOG.info("Deploying topology in an INACTIVE state...");
-                    submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
-                } else {
-                    LOG.info("Deploying topology in an ACTIVE state...");
-                    submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
-                }
-                StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
+            // should the topology be active or inactive
+            SubmitOptions submitOptions = null;
+            if (cmd.hasOption(OPTION_INACTIVE)) {
+                LOG.info("Deploying topology in an INACTIVE state...");
+                submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
             } else {
-                LOG.error("To run in local mode run with 'storm local' instead of 'storm jar'");
-                return;
+                LOG.info("Deploying topology in an ACTIVE state...");
+                submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
             }
+            StormSubmitter.submitTopology(topologyName, conf, topology, submitOptions, null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/095bdbc2/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 406bb56..3703546 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -124,6 +124,7 @@ import org.slf4j.LoggerFactory;
 public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
     public static final KillOptions KILL_NOW = new KillOptions();
     private static final Logger LOG = LoggerFactory.getLogger(LocalCluster.class);
+    private static final long DEFAULT_ZK_PORT = 2181;
 
     static {
         KILL_NOW.set_wait_secs(0);
@@ -309,8 +310,27 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
      * @throws Exception on any Exception.
      */
     public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec) throws Exception {
+        return withLocalModeOverride(c, ttlSec, null);
+    }
+
+    /**
+     * Run c with a local mode cluster overriding the NimbusClient and DRPCClient calls. NOTE local mode override happens by default now
+     * unless netty is turned on for the local cluster.
+     *
+     * @param c      the callable to run in this mode
+     * @param ttlSec the number of seconds to let the cluster run after c has completed
+     * @param daemonConf configs to set for the daemon processes.
+     * @return the result of calling C
+     *
+     * @throws Exception on any Exception.
+     */
+    public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec, Map<String, Object> daemonConf) throws Exception {
         LOG.info("\n\n\t\tSTARTING LOCAL MODE CLUSTER\n\n");
-        try (LocalCluster local = new LocalCluster();
+        Builder builder = new Builder();
+        if (daemonConf != null) {
+            builder.withDaemonConf(daemonConf);
+        }
+        try (LocalCluster local = builder.build();
              LocalDRPC drpc = new LocalDRPC(local.metricRegistry);
              DRPCClient.LocalOverride drpcOverride = new DRPCClient.LocalOverride(drpc)) {
 
@@ -323,6 +343,11 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
         }
     }
 
+    /**
+     * Main entry point to running in local mode.
+     * @param args arguments to be run in local mode
+     * @throws Exception on any error when running.
+     */
     public static void main(final String[] args) throws Exception {
         if (args.length < 1) {
             throw new IllegalArgumentException("No class was specified to run");
@@ -336,6 +361,24 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             LOG.warn("could not parse the sleep time defaulting to {} seconds", ttl);
         }
 
+        Map<String, Object> daemonConf = new HashMap<>();
+        String zkOverride = System.getProperty("storm.local.zookeeper");
+        if (zkOverride != null) {
+            LOG.info("Using ZooKeeper at '{}' instead of in-process one.", zkOverride);
+            long zkPort = DEFAULT_ZK_PORT;
+            String zkHost = null;
+            if (zkOverride.contains(":")) {
+                String[] hostPort = zkOverride.split(":");
+                zkHost = hostPort[0];
+                zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
+            } else {
+                zkHost = zkOverride;
+            }
+            daemonConf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true);
+            daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkHost));
+            daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
+        }
+
         withLocalModeOverride(() -> {
             String klass = args[0];
             String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
@@ -345,7 +388,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             LOG.info("\n\n\t\tRUNNING {} with args {}\n\n", main, Arrays.toString(newArgs));
             main.invoke(null, (Object) newArgs);
             return (Void) null;
-        }, ttl);
+        }, ttl, daemonConf);
 
         //Sometimes external things used with testing don't shut down all the way
         System.exit(0);
@@ -354,10 +397,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
     /**
      * Checks if Nimbuses have elected a leader.
      *
-     * @return boolean
-     *
-     * @throws AuthorizationException
-     * @throws TException
+     * @return true if there is a leader else false.
      */
     private boolean hasLeader() throws AuthorizationException, TException {
         ClusterSummary summary = getNimbus().getClusterInfo();
@@ -918,22 +958,22 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
     }
 
     @Override
-    public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean is_include_sys)
+    public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean isIncludeSys)
         throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
 
     @Override
-    public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean is_include_sys)
+    public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean isIncludeSys)
         throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
 
     @Override
-    public ComponentPageInfo getComponentPageInfo(String topology_id, String component_id, String window,
-                                                  boolean is_include_sys) throws NotAliveException, AuthorizationException, TException {
+    public ComponentPageInfo getComponentPageInfo(String topologyId, String componentId, String window,
+                                                  boolean isIncludeSys) throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
@@ -1198,7 +1238,11 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
      * an AutoCloseable topology that not only gives you access to the compiled StormTopology but also will kill the topology when it
      * closes.
      *
-     * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) { // Run Some test } // The topology has been killed
+     * <code>
+     * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) {
+     *     // Run Some test
+     * } // The topology has been killed
+     * </code>
      */
     public class LocalTopology extends StormTopology implements ILocalTopology {
         private static final long serialVersionUID = 6145919776650637748L;


[2/2] storm git commit: Merge branch 'STORM-3276' of github.com:revans2/incubator-storm into STORM-3276

Posted by bo...@apache.org.
Merge branch 'STORM-3276' of github.com:revans2/incubator-storm into STORM-3276

STORM-3276: Updated Flux to deal with storm local correctly

This closes #2908


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/074fb35a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/074fb35a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/074fb35a

Branch: refs/heads/master
Commit: 074fb35a0a5ab746d5508d2a60b48a7948a2cdda
Parents: cf00a53 095bdbc
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 21 12:20:57 2018 -0600
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Dec 21 12:20:57 2018 -0600

----------------------------------------------------------------------
 bin/storm.py                                    | 13 +++-
 flux/README.md                                  | 60 +++++-------------
 .../main/java/org/apache/storm/flux/Flux.java   | 47 +++++++-------
 .../java/org/apache/storm/LocalCluster.java     | 66 ++++++++++++++++----
 4 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------