You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 18:33:33 UTC

[1/6] storm git commit: STORM-1267: port backtype.storm.command.set-log-level to java

Repository: storm
Updated Branches:
  refs/heads/master 56bc60374 -> dece08fbd


STORM-1267: port backtype.storm.command.set-log-level to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3425e7d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3425e7d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3425e7d1

Branch: refs/heads/master
Commit: 3425e7d12f514c976fa793f31dcbeafa1527bab4
Parents: 4ca7522
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:37:57 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:37:57 2016 +0530

----------------------------------------------------------------------
 .../org/apache/storm/command/set_log_level.clj  |  76 ------------
 .../src/jvm/org/apache/storm/command/CLI.java   |  25 +++-
 .../org/apache/storm/command/SetLogLevel.java   | 116 +++++++++++++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  61 +++++++---
 .../apache/storm/command/SetLogLevelTest.java   |  54 +++++++++
 .../jvm/org/apache/storm/command/TestCLI.java   |  62 ++++++----
 6 files changed, 272 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
deleted file mode 100644
index 6048246..0000000
--- a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
+++ /dev/null
@@ -1,76 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.set-log-level
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm log])
-  (:use [org.apache.storm.internal thrift])
-  (:import [org.apache.logging.log4j Level])
-  (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction])
-  (:gen-class))
-
-(defn- get-storm-id
-  "Get topology id for a running topology from the topology name."
-  [nimbus name]
-  (let [info (.getClusterInfo nimbus)
-        topologies (.get_topologies info)
-        topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))]
-    (if topology 
-      (.get_id topology)
-      (throw (.IllegalArgumentException (str name " is not a running topology"))))))
-
-(defn- parse-named-log-levels [action]
-  "Parses [logger name]=[level string]:[optional timeout],[logger name2]...
-
-   e.g. ROOT=DEBUG:30
-        root logger, debug for 30 seconds
-
-        org.apache.foo=WARN
-        org.apache.foo set to WARN indefinitely"
-  (fn [^String s]
-    (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s)
-          name (if (= action LogLevelAction/REMOVE) s (nth log-args 1))
-          level (Level/toLevel (nth log-args 2))
-          timeout-str (nth log-args 3)
-          log-level (LogLevel.)]
-      (if (= action LogLevelAction/REMOVE)
-        (.set_action log-level action)
-        (do
-          (.set_action log-level action)
-          (.set_target_log_level log-level (.toString level))
-          (.set_reset_log_level_timeout_secs log-level
-            (Integer. (if (= timeout-str "") "0" timeout-str)))))
-      {name log-level})))
-
-(defn- merge-together [previous key val]
-   (assoc previous key
-      (if-let [oldval (get previous key)]
-         (merge oldval val)
-         val)))
-
-(defn -main [& args]
-  (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _]
-        (cli args ["-l" "--log-setting"
-                   :parse-fn (parse-named-log-levels LogLevelAction/UPDATE)
-                   :assoc-fn merge-together]
-                  ["-r" "--remove-log-setting"
-                   :parse-fn (parse-named-log-levels LogLevelAction/REMOVE)
-                   :assoc-fn merge-together])
-        log-config (LogConfig.)]
-    (doseq [[log-name log-val] (merge log-setting remove-log-setting)]
-      (.put_to_named_logger_level log-config log-name log-val))
-    (log-message "Sent log config " log-config " for topology " name)
-    (with-configured-nimbus-connection nimbus
-      (.setLogConfig nimbus (get-storm-id nimbus name) log-config))))

http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
index d4eaa5d..f29debc 100644
--- a/storm-core/src/jvm/org/apache/storm/command/CLI.java
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -17,19 +17,18 @@
  */
 package org.apache.storm.command;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class CLI {
     private static final Logger LOG = LoggerFactory.getLogger(CLI.class);
     private static class Opt {
@@ -139,6 +138,20 @@ public class CLI {
         }
     };
 
+    /**
+     * All values are returned as a map
+     */
+    public static final Assoc INTO_MAP = new Assoc() {
+        @Override
+        public Object assoc(Object current, Object value) {
+            if (null == current) {
+                current = new HashMap<Object, Object>();
+            }
+            ((Map<Object, Object>) current).putAll((Map<Object, Object>) value);
+            return current;
+        }
+    };
+
     public static class CLIBuilder {
         private final ArrayList<Opt> opts = new ArrayList<>();
         private final ArrayList<Arg> args = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
new file mode 100644
index 0000000..30cea5f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.logging.log4j.Level;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SetLogLevel {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SetLogLevel.class);
+
+    public static void main(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("l", "log-setting", null, new LogLevelsParser(LogLevelAction.UPDATE), CLI.INTO_MAP)
+            .opt("r", "remove-log-setting", null, new LogLevelsParser(LogLevelAction.REMOVE), CLI.INTO_MAP)
+            .arg("topologyName", CLI.FIRST_WINS)
+            .parse(args);
+        final String topologyName = (String) cl.get("topologyName");
+        final LogConfig logConfig = new LogConfig();
+        Map<String, LogLevel> logLevelMap = new HashMap<>();
+        Map<String, LogLevel> updateLogLevel = (Map<String, LogLevel>) cl.get("l");
+        if (null != updateLogLevel) {
+            logLevelMap.putAll(updateLogLevel);
+        }
+        Map<String, LogLevel> removeLogLevel = (Map<String, LogLevel>) cl.get("r");
+        if (null != removeLogLevel) {
+            logLevelMap.putAll(removeLogLevel);
+        }
+
+        for (Map.Entry<String, LogLevel> entry : logLevelMap.entrySet()) {
+            logConfig.put_to_named_logger_level(entry.getKey(), entry.getValue());
+        }
+
+        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+            @Override
+            public void run(Nimbus.Client nimbus) throws Exception {
+                String topologyId = Utils.getTopologyId(topologyName, nimbus);
+                if (null == topologyId) {
+                    throw new IllegalArgumentException(topologyName + " is not a running topology");
+                }
+                nimbus.setLogConfig(topologyId, logConfig);
+                LOG.info("Log config {} is sent for topology {}", logConfig, topologyName);
+            }
+        });
+    }
+
+    /**
+     * Parses [logger name]=[level string]:[optional timeout],[logger name2]...
+     *
+     * e.g. ROOT=DEBUG:30
+     *     root logger, debug for 30 seconds
+     *
+     *     org.apache.foo=WARN
+     *     org.apache.foo set to WARN indefinitely
+     */
+    static final class LogLevelsParser implements CLI.Parse {
+
+        private LogLevelAction action;
+
+        public LogLevelsParser(LogLevelAction action) {
+            this.action = action;
+        }
+
+        @Override
+        public Object parse(String value) {
+            final LogLevel logLevel = new LogLevel();
+            logLevel.set_action(action);
+            String name = null;
+            if (action == LogLevelAction.REMOVE) {
+                name = value;
+            } else {
+                String[] splits = value.split("=");
+                Preconditions.checkArgument(splits.length == 2, "Invalid log string '%s'", value);
+                name = splits[0];
+                splits = splits[1].split(":");
+                Integer timeout = 0;
+                Level level = Level.valueOf(splits[0]);
+                logLevel.set_reset_log_level(level.toString());
+                if (splits.length > 1) {
+                    timeout = Integer.parseInt(splits[1]);
+                }
+                logLevel.set_reset_log_level_timeout_secs(timeout);
+            }
+            Map<String, LogLevel> result = new HashMap<>();
+            result.put(name, logLevel);
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index b62f99c..fe0c431 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -17,11 +17,22 @@
  */
 package org.apache.storm.utils;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
+import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
+import org.apache.curator.ensemble.exhibitor.Exhibitors;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.BlobStoreAclHandler;
@@ -29,22 +40,24 @@ import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.daemon.JarTransformer;
-import org.apache.storm.generated.*;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.serialization.DefaultSerializationDelegate;
 import org.apache.storm.serialization.SerializationDelegate;
-import clojure.lang.RT;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
-import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
-import org.apache.curator.ensemble.exhibitor.Exhibitors;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -118,6 +131,8 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import clojure.lang.RT;
+
 public class Utils {
     // A singleton instance allows us to mock delegated static methods in our
     // tests by subclassing.
@@ -1430,21 +1445,29 @@ public class Utils {
     }
 
     public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
-        NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser);
-        TopologyInfo topologyInfo = null;
+        try (NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser)) {
+            String topologyId = getTopologyId(name, client.getClient());
+            if (null != topologyId) {
+                return client.getClient().getTopologyInfo(topologyId);
+            }
+            return null;
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static String getTopologyId(String name, Nimbus.Client client) {
         try {
-            ClusterSummary summary = client.getClient().getClusterInfo();
+            ClusterSummary summary = client.getClusterInfo();
             for(TopologySummary s : summary.get_topologies()) {
                 if(s.get_name().equals(name)) {
-                    topologyInfo = client.getClient().getTopologyInfo(s.get_id());
+                    return s.get_id();
                 }
             }
         } catch(Exception e) {
             throw new RuntimeException(e);
-        } finally {
-            client.close();
         }
-        return topologyInfo;
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java b/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java
new file mode 100644
index 0000000..4582371
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SetLogLevelTest {
+
+    @Test
+    public void testUpdateLogLevelParser() {
+        SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE);
+        LogLevel logLevel = ((Map<String, LogLevel>) logLevelsParser.parse("com.foo.one=warn")).get("com.foo.one");
+        Assert.assertEquals(0, logLevel.get_reset_log_level_timeout_secs());
+        Assert.assertEquals("WARN", logLevel.get_reset_log_level());
+
+        logLevel = ((Map<String, LogLevel>) logLevelsParser.parse("com.foo.two=DEBUG:10")).get("com.foo.two");
+        Assert.assertEquals(10, logLevel.get_reset_log_level_timeout_secs());
+        Assert.assertEquals("DEBUG", logLevel.get_reset_log_level());
+    }
+
+    @Test(expected = NumberFormatException.class)
+    public void testInvalidTimeout() {
+        SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE);
+        logLevelsParser.parse("com.foo.bar=warn:NaN");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidLogLevel() {
+        SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE);
+        logLevelsParser.parse("com.foo.bar=CRITICAL");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
index b647458..c9a4b79 100644
--- a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
+++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
@@ -18,42 +18,62 @@
 
 package org.apache.storm.command;
 
-import java.util.Map;
+import org.junit.Test;
+
+import java.util.HashMap;
 import java.util.List;
-import java.util.Arrays;
+import java.util.Map;
 
-import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class TestCLI {
+
     @Test
     public void testSimple() throws Exception {
         Map<String, Object> values = CLI.opt("a", "aa", null)
-           .opt("b", "bb", 1, CLI.AS_INT)
-           .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
-           .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
-           .arg("A")
-           .arg("B", CLI.AS_INT)
-           .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3");
-        assertEquals(6, values.size());
-        assertEquals("200", (String)values.get("a"));
-        assertEquals((Integer)40, (Integer)values.get("b"));
-        assertEquals((Integer)2, (Integer)values.get("c"));
-
-        List<String> d = (List<String>)values.get("d");
+            .opt("b", "bb", 1, CLI.AS_INT)
+            .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
+            .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
+            .opt("e", "ee", null, new PairParse(), CLI.INTO_MAP)
+            .arg("A")
+            .arg("B", CLI.AS_INT)
+            .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"
+                , "-e", "key1=value1", "-e", "key2=value2");
+        assertEquals(7, values.size());
+        assertEquals("200", (String) values.get("a"));
+        assertEquals((Integer) 40, (Integer) values.get("b"));
+        assertEquals((Integer) 2, (Integer) values.get("c"));
+
+        List<String> d = (List<String>) values.get("d");
         assertEquals(3, d.size());
         assertEquals("1", d.get(0));
         assertEquals("2", d.get(1));
         assertEquals("3", d.get(2));
 
-        List<String> A = (List<String>)values.get("A");
+        List<String> A = (List<String>) values.get("A");
         assertEquals(1, A.size());
         assertEquals("A-VALUE", A.get(0));
 
-        List<Integer> B = (List<Integer>)values.get("B");
+        List<Integer> B = (List<Integer>) values.get("B");
         assertEquals(3, B.size());
-        assertEquals((Integer)1, B.get(0));
-        assertEquals((Integer)2, B.get(1));
-        assertEquals((Integer)3, B.get(2));
+        assertEquals((Integer) 1, B.get(0));
+        assertEquals((Integer) 2, B.get(1));
+        assertEquals((Integer) 3, B.get(2));
+
+        Map<String, String> e = (Map<String, String>) values.get("e");
+        assertEquals(2, e.size());
+        assertEquals("value1", e.get("key1"));
+        assertEquals("value2", e.get("key2"));
+    }
+
+    private static final class PairParse implements CLI.Parse {
+
+        @Override
+        public Object parse(String value) {
+            Map<String, String> result = new HashMap<>();
+            String[] splits = value.split("=");
+            result.put(splits[0], splits[1]);
+            return result;
+        }
     }
 }


[6/6] storm git commit: Added STORM-1267 STORM-1266 and STORM-1265 to Changelog

Posted by bo...@apache.org.
Added STORM-1267 STORM-1266 and STORM-1265 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dece08fb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dece08fb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dece08fb

Branch: refs/heads/master
Commit: dece08fbdee7903c76c227e3ec638e57705856cd
Parents: b837590
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Feb 24 11:32:42 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Feb 24 11:32:42 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dece08fb/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 285cfe4..b2535d1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
 ## 2.0.0
+ * STORM-1267: Port set_log_level
+ * STORM-1266: Port rebalance
+ * STORM-1265: Port monitor
  * STORM-1572: throw NPE when parsing the command line arguments by CLI
  * STORM-1273: port backtype.storm.cluster to java
  * STORM-1479: use a simple implemention for IntSerializer


[2/6] storm git commit: STORM-1266: port backtype.storm.command.rebalance to java

Posted by bo...@apache.org.
STORM-1266: port backtype.storm.command.rebalance to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8aaa8388
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8aaa8388
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8aaa8388

Branch: refs/heads/master
Commit: 8aaa83880de7a5bc9595a6e3aabc242c3bec5470
Parents: 3425e7d
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:38:43 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:38:43 2016 +0530

----------------------------------------------------------------------
 .../clj/org/apache/storm/command/rebalance.clj  | 47 -----------
 .../jvm/org/apache/storm/command/Rebalance.java | 86 ++++++++++++++++++++
 .../org/apache/storm/command/RebalanceTest.java | 41 ++++++++++
 3 files changed, 127 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/clj/org/apache/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
deleted file mode 100644
index 8428d14..0000000
--- a/storm-core/src/clj/org/apache/storm/command/rebalance.clj
+++ /dev/null
@@ -1,47 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.rebalance
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm config log])
-  (:use [org.apache.storm.internal thrift])
-  (:import [org.apache.storm.generated RebalanceOptions])
-  (:gen-class))
-
-(defn- parse-executor [^String s]
-  (let [eq-pos (.lastIndexOf s "=")
-        name (.substring s 0 eq-pos)
-        amt (.substring s (inc eq-pos))]
-    {name (Integer/parseInt amt)}
-    ))
-
-(defn -main [& args] 
-  (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
-                  (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]
-                            ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)]
-                            ["-e" "--executor"  :parse-fn parse-executor
-                             :assoc-fn (fn [previous key val]
-                                         (assoc previous key
-                                                (if-let [oldval (get previous key)]
-                                                  (merge oldval val)
-                                                  val)))])
-        opts (RebalanceOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (if executor (.set_num_executors opts executor))
-    (if num-workers (.set_num_workers opts num-workers))
-    (with-configured-nimbus-connection nimbus
-      (.rebalance nimbus name opts)
-      (log-message "Topology " name " is rebalancing")
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
new file mode 100644
index 0000000..ed65950
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.lang.String.format;
+
+public class Rebalance {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Rebalance.class);
+
+    public static void main(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+            .opt("n", "num-workers", null, CLI.AS_INT)
+            .opt("e", "executor", null, new ExecutorParser(), CLI.INTO_MAP)
+            .arg("topologyName", CLI.FIRST_WINS)
+            .parse(args);
+        final String name = (String) cl.get("topologyName");
+        final RebalanceOptions rebalanceOptions = new RebalanceOptions();
+        Integer wait = (Integer) cl.get("w");
+        Integer numWorkers = (Integer) cl.get("n");
+        Map<String, Integer> numExecutors = (Map<String, Integer>) cl.get("e");
+
+        if (null != wait) {
+            rebalanceOptions.set_wait_secs(wait);
+        }
+        if (null != numWorkers) {
+            rebalanceOptions.set_num_workers(numWorkers);
+        }
+        if (null != numExecutors) {
+            rebalanceOptions.set_num_executors(numExecutors);
+        }
+
+        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+            @Override
+            public void run(Nimbus.Client nimbus) throws Exception {
+                nimbus.rebalance(name, rebalanceOptions);
+                LOG.info("Topology {} is rebalancing", name);
+            }
+        });
+    }
+
+
+    static final class ExecutorParser implements CLI.Parse {
+
+        @Override
+        public Object parse(String value) {
+            try {
+                int splitIndex = value.lastIndexOf('=');
+                String componentName = value.substring(0, splitIndex);
+                Integer parallelism = Integer.parseInt(value.substring(splitIndex + 1));
+                Map<String, Integer> result = new HashMap<String, Integer>();
+                result.put(componentName, parallelism);
+                return result;
+            } catch (Throwable ex) {
+                throw new IllegalArgumentException(
+                    format("Failed to parse '%s' correctly. Expected in <component>=<parallelism> format", value), ex);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
new file mode 100644
index 0000000..cec4958
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.storm.command;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class RebalanceTest {
+
+    @Test
+    public void testParser() throws Exception {
+        Rebalance.ExecutorParser executorParser = new Rebalance.ExecutorParser();
+        Map<String, Integer> componentParallelism = (Map<String, Integer>) executorParser.parse("comp1=3");
+        Assert.assertEquals(3, (int) componentParallelism.get("comp1"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testExepction() throws Exception {
+        Rebalance.ExecutorParser executorParser = new Rebalance.ExecutorParser();
+        executorParser.parse("comp1 3");
+    }
+}


[5/6] storm git commit: Merge branch 'commands' of https://github.com/abhishekagarwal87/storm into STORM-1267

Posted by bo...@apache.org.
Merge branch 'commands' of https://github.com/abhishekagarwal87/storm into STORM-1267

Conflicts:
	storm-core/test/jvm/org/apache/storm/command/TestCLI.java

STORM-1267: Port set_log_level
STORM-1266: Port rebalance
STORM-1265: Port monitor

This closes #1137


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8375907
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8375907
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8375907

Branch: refs/heads/master
Commit: b83759074b7b0e7b1727dd4fac1c2b5ab9c4cdad
Parents: 56bc603 2d4ad6f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Feb 24 11:30:24 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Feb 24 11:30:24 2016 -0600

----------------------------------------------------------------------
 bin/storm.cmd                                   |   2 +-
 bin/storm.py                                    |   6 +-
 .../clj/org/apache/storm/command/monitor.clj    |  37 ------
 .../clj/org/apache/storm/command/rebalance.clj  |  47 --------
 .../org/apache/storm/command/set_log_level.clj  |  76 ------------
 .../src/jvm/org/apache/storm/command/CLI.java   |  25 +++-
 .../jvm/org/apache/storm/command/Monitor.java   |  65 +++++++++++
 .../jvm/org/apache/storm/command/Rebalance.java |  86 ++++++++++++++
 .../org/apache/storm/command/SetLogLevel.java   | 116 +++++++++++++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  61 +++++++---
 .../org/apache/storm/command/RebalanceTest.java |  41 +++++++
 .../apache/storm/command/SetLogLevelTest.java   |  54 +++++++++
 .../jvm/org/apache/storm/command/TestCLI.java   |  42 +++++--
 13 files changed, 458 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b8375907/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b8375907/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b8375907/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
----------------------------------------------------------------------
diff --cc storm-core/test/jvm/org/apache/storm/command/TestCLI.java
index 5b2f220,c9a4b79..d1caa3c
--- a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
+++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
@@@ -29,20 -31,20 +31,22 @@@ public class TestCLI 
      @Test
      public void testSimple() throws Exception {
          Map<String, Object> values = CLI.opt("a", "aa", null)
 -            .opt("b", "bb", 1, CLI.AS_INT)
 -            .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
 -            .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
 -            .opt("e", "ee", null, new PairParse(), CLI.INTO_MAP)
 -            .arg("A")
 -            .arg("B", CLI.AS_INT)
 -            .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"
 -                , "-e", "key1=value1", "-e", "key2=value2");
 -        assertEquals(7, values.size());
 -        assertEquals("200", (String) values.get("a"));
 -        assertEquals((Integer) 40, (Integer) values.get("b"));
 -        assertEquals((Integer) 2, (Integer) values.get("c"));
 +           .opt("b", "bb", 1, CLI.AS_INT)
 +           .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
 +           .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
 +           .opt("e", "ee", null, CLI.AS_INT)
++           .opt("f", "ff", null, new PairParse(), CLI.INTO_MAP)
 +           .arg("A")
 +           .arg("B", CLI.AS_INT)
-            .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3");
-         assertEquals(7, values.size());
++           .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"
++                , "-f", "key1=value1", "-f", "key2=value2");
++        assertEquals(8, values.size());
 +        assertEquals("200", (String)values.get("a"));
 +        assertEquals((Integer)40, (Integer)values.get("b"));
 +        assertEquals((Integer)2, (Integer)values.get("c"));
 +        assertEquals(null, values.get("e"));
  
 -        List<String> d = (List<String>) values.get("d");
 +        List<String> d = (List<String>)values.get("d");
          assertEquals(3, d.size());
          assertEquals("1", d.get(0));
          assertEquals("2", d.get(1));
@@@ -52,10 -54,26 +56,26 @@@
          assertEquals(1, A.size());
          assertEquals("A-VALUE", A.get(0));
  
-         List<Integer> B = (List<Integer>)values.get("B");
+         List<Integer> B = (List<Integer>) values.get("B");
          assertEquals(3, B.size());
-         assertEquals((Integer)1, B.get(0));
-         assertEquals((Integer)2, B.get(1));
-         assertEquals((Integer)3, B.get(2));
+         assertEquals((Integer) 1, B.get(0));
+         assertEquals((Integer) 2, B.get(1));
+         assertEquals((Integer) 3, B.get(2));
+ 
 -        Map<String, String> e = (Map<String, String>) values.get("e");
 -        assertEquals(2, e.size());
 -        assertEquals("value1", e.get("key1"));
 -        assertEquals("value2", e.get("key2"));
++        Map<String, String> f = (Map<String, String>) values.get("f");
++        assertEquals(2, f.size());
++        assertEquals("value1", f.get("key1"));
++        assertEquals("value2", f.get("key2"));
+     }
+ 
+     private static final class PairParse implements CLI.Parse {
+ 
+         @Override
+         public Object parse(String value) {
+             Map<String, String> result = new HashMap<>();
+             String[] splits = value.split("=");
+             result.put(splits[0], splits[1]);
+             return result;
+         }
      }
  }


[3/6] storm git commit: STORM-1265: port backtype.storm.command.monitor to java

Posted by bo...@apache.org.
STORM-1265: port backtype.storm.command.monitor to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93b314cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93b314cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93b314cf

Branch: refs/heads/master
Commit: 93b314cf6e092e42c6cd6116618bfce94797c5cf
Parents: 8aaa838
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:39:10 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:39:10 2016 +0530

----------------------------------------------------------------------
 .../clj/org/apache/storm/command/monitor.clj    | 37 -----------
 .../jvm/org/apache/storm/command/Monitor.java   | 65 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/93b314cf/storm-core/src/clj/org/apache/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/monitor.clj b/storm-core/src/clj/org/apache/storm/command/monitor.clj
deleted file mode 100644
index 4ec49af..0000000
--- a/storm-core/src/clj/org/apache/storm/command/monitor.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.monitor
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm.internal.thrift :only [with-configured-nimbus-connection]])
-  (:import [org.apache.storm.utils Monitor])
-  (:gen-class)
- )
-
-(defn -main [& args]
-  (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
-        (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
-          ["-m" "--component" :default nil]
-          ["-s" "--stream" :default "default"]
-          ["-w" "--watch" :default "emitted"])
-        mon (Monitor.)]
-    (if interval (.set_interval mon interval))
-    (if name (.set_topology mon name))
-    (if component (.set_component mon component))
-    (if stream (.set_stream mon stream))
-    (if watch (.set_watch mon watch))
-    (with-configured-nimbus-connection nimbus
-      (.metrics mon nimbus)
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/93b314cf/storm-core/src/jvm/org/apache/storm/command/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Monitor.java b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
new file mode 100644
index 0000000..68a65ea
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.NimbusClient;
+
+import java.util.Map;
+
+public class Monitor {
+
+    public static void main(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("i", "interval", 4, CLI.AS_INT)
+            .opt("m", "component", null)
+            .opt("s", "stream", "default")
+            .opt("w", "watch", "emitted")
+            .arg("topologyName", CLI.FIRST_WINS)
+            .parse(args);
+        final org.apache.storm.utils.Monitor monitor = new org.apache.storm.utils.Monitor();
+        Integer interval = (Integer) cl.get("i");
+        String component = (String) cl.get("m");
+        String stream = (String) cl.get("s");
+        String watch = (String) cl.get("w");
+        String topologyName = (String) cl.get("topologyName");
+
+        if (null != interval) {
+            monitor.set_interval(interval);
+        }
+        if (null != component) {
+            monitor.set_component(component);
+        }
+        if (null != stream) {
+            monitor.set_stream(stream);
+        }
+        if (null != watch) {
+            monitor.set_watch(watch);
+        }
+        if (null != topologyName) {
+            monitor.set_topology(topologyName);
+        }
+
+        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+            @Override
+            public void run(Nimbus.Client nimbus) throws Exception {
+                monitor.metrics(nimbus);
+            }
+        });
+    }
+}


[4/6] storm git commit: Update commands in binary files

Posted by bo...@apache.org.
Update commands in binary files


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d4ad6f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d4ad6f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d4ad6f4

Branch: refs/heads/master
Commit: 2d4ad6f4655ff56057d50f359394ecd01fde43eb
Parents: 93b314c
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:48:28 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:48:28 2016 +0530

----------------------------------------------------------------------
 bin/storm.cmd | 2 +-
 bin/storm.py  | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2d4ad6f4/bin/storm.cmd
----------------------------------------------------------------------
diff --git a/bin/storm.cmd b/bin/storm.cmd
index ff3b246..1ef1e42 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -194,7 +194,7 @@
   goto :eof
 
 :rebalance
-  set CLASS=org.apache.storm.command.rebalance
+  set CLASS=org.apache.storm.command.Rebalance
   set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS%
   goto :eof
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2d4ad6f4/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index acbfe7b..94d6143 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -378,7 +378,7 @@ def set_log_level(*args):
         Clears settings, resetting back to the original level
     """
     exec_storm_class(
-        "org.apache.storm.command.set_log_level",
+        "org.apache.storm.command.SetLogLevel",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
@@ -433,7 +433,7 @@ def rebalance(*args):
         print_usage(command="rebalance")
         sys.exit(2)
     exec_storm_class(
-        "org.apache.storm.command.rebalance",
+        "org.apache.storm.command.Rebalance",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
@@ -685,7 +685,7 @@ def monitor(*args):
         watch-item is 'emitted';
     """
     exec_storm_class(
-        "org.apache.storm.command.monitor",
+        "org.apache.storm.command.Monitor",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])