You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 18:33:33 UTC
[1/6] storm git commit: STORM-1267: port
backtype.storm.command.set-log-level to java
Repository: storm
Updated Branches:
refs/heads/master 56bc60374 -> dece08fbd
STORM-1267: port backtype.storm.command.set-log-level to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3425e7d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3425e7d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3425e7d1
Branch: refs/heads/master
Commit: 3425e7d12f514c976fa793f31dcbeafa1527bab4
Parents: 4ca7522
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:37:57 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:37:57 2016 +0530
----------------------------------------------------------------------
.../org/apache/storm/command/set_log_level.clj | 76 ------------
.../src/jvm/org/apache/storm/command/CLI.java | 25 +++-
.../org/apache/storm/command/SetLogLevel.java | 116 +++++++++++++++++++
.../src/jvm/org/apache/storm/utils/Utils.java | 61 +++++++---
.../apache/storm/command/SetLogLevelTest.java | 54 +++++++++
.../jvm/org/apache/storm/command/TestCLI.java | 62 ++++++----
6 files changed, 272 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
deleted file mode 100644
index 6048246..0000000
--- a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj
+++ /dev/null
@@ -1,76 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.set-log-level
- (:use [clojure.tools.cli :only [cli]])
- (:use [org.apache.storm log])
- (:use [org.apache.storm.internal thrift])
- (:import [org.apache.logging.log4j Level])
- (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction])
- (:gen-class))
-
-(defn- get-storm-id
- "Get topology id for a running topology from the topology name."
- [nimbus name]
- (let [info (.getClusterInfo nimbus)
- topologies (.get_topologies info)
- topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))]
- (if topology
- (.get_id topology)
- (throw (.IllegalArgumentException (str name " is not a running topology"))))))
-
-(defn- parse-named-log-levels [action]
- "Parses [logger name]=[level string]:[optional timeout],[logger name2]...
-
- e.g. ROOT=DEBUG:30
- root logger, debug for 30 seconds
-
- org.apache.foo=WARN
- org.apache.foo set to WARN indefinitely"
- (fn [^String s]
- (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s)
- name (if (= action LogLevelAction/REMOVE) s (nth log-args 1))
- level (Level/toLevel (nth log-args 2))
- timeout-str (nth log-args 3)
- log-level (LogLevel.)]
- (if (= action LogLevelAction/REMOVE)
- (.set_action log-level action)
- (do
- (.set_action log-level action)
- (.set_target_log_level log-level (.toString level))
- (.set_reset_log_level_timeout_secs log-level
- (Integer. (if (= timeout-str "") "0" timeout-str)))))
- {name log-level})))
-
-(defn- merge-together [previous key val]
- (assoc previous key
- (if-let [oldval (get previous key)]
- (merge oldval val)
- val)))
-
-(defn -main [& args]
- (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _]
- (cli args ["-l" "--log-setting"
- :parse-fn (parse-named-log-levels LogLevelAction/UPDATE)
- :assoc-fn merge-together]
- ["-r" "--remove-log-setting"
- :parse-fn (parse-named-log-levels LogLevelAction/REMOVE)
- :assoc-fn merge-together])
- log-config (LogConfig.)]
- (doseq [[log-name log-val] (merge log-setting remove-log-setting)]
- (.put_to_named_logger_level log-config log-name log-val))
- (log-message "Sent log config " log-config " for topology " name)
- (with-configured-nimbus-connection nimbus
- (.setLogConfig nimbus (get-storm-id nimbus name) log-config))))
http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
index d4eaa5d..f29debc 100644
--- a/storm-core/src/jvm/org/apache/storm/command/CLI.java
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -17,19 +17,18 @@
*/
package org.apache.storm.command;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class CLI {
private static final Logger LOG = LoggerFactory.getLogger(CLI.class);
private static class Opt {
@@ -139,6 +138,20 @@ public class CLI {
}
};
+ /**
+ * All values are returned as a map
+ */
+ public static final Assoc INTO_MAP = new Assoc() {
+ @Override
+ public Object assoc(Object current, Object value) {
+ if (null == current) {
+ current = new HashMap<Object, Object>();
+ }
+ ((Map<Object, Object>) current).putAll((Map<Object, Object>) value);
+ return current;
+ }
+ };
+
public static class CLIBuilder {
private final ArrayList<Opt> opts = new ArrayList<>();
private final ArrayList<Arg> args = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
new file mode 100644
index 0000000..30cea5f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.logging.log4j.Level;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SetLogLevel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SetLogLevel.class);
+
+ public static void main(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("l", "log-setting", null, new LogLevelsParser(LogLevelAction.UPDATE), CLI.INTO_MAP)
+ .opt("r", "remove-log-setting", null, new LogLevelsParser(LogLevelAction.REMOVE), CLI.INTO_MAP)
+ .arg("topologyName", CLI.FIRST_WINS)
+ .parse(args);
+ final String topologyName = (String) cl.get("topologyName");
+ final LogConfig logConfig = new LogConfig();
+ Map<String, LogLevel> logLevelMap = new HashMap<>();
+ Map<String, LogLevel> updateLogLevel = (Map<String, LogLevel>) cl.get("l");
+ if (null != updateLogLevel) {
+ logLevelMap.putAll(updateLogLevel);
+ }
+ Map<String, LogLevel> removeLogLevel = (Map<String, LogLevel>) cl.get("r");
+ if (null != removeLogLevel) {
+ logLevelMap.putAll(removeLogLevel);
+ }
+
+ for (Map.Entry<String, LogLevel> entry : logLevelMap.entrySet()) {
+ logConfig.put_to_named_logger_level(entry.getKey(), entry.getValue());
+ }
+
+ NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+ @Override
+ public void run(Nimbus.Client nimbus) throws Exception {
+ String topologyId = Utils.getTopologyId(topologyName, nimbus);
+ if (null == topologyId) {
+ throw new IllegalArgumentException(topologyName + " is not a running topology");
+ }
+ nimbus.setLogConfig(topologyId, logConfig);
+ LOG.info("Log config {} is sent for topology {}", logConfig, topologyName);
+ }
+ });
+ }
+
+ /**
+ * Parses [logger name]=[level string]:[optional timeout],[logger name2]...
+ *
+ * e.g. ROOT=DEBUG:30
+ * root logger, debug for 30 seconds
+ *
+ * org.apache.foo=WARN
+ * org.apache.foo set to WARN indefinitely
+ */
+ static final class LogLevelsParser implements CLI.Parse {
+
+ private LogLevelAction action;
+
+ public LogLevelsParser(LogLevelAction action) {
+ this.action = action;
+ }
+
+ @Override
+ public Object parse(String value) {
+ final LogLevel logLevel = new LogLevel();
+ logLevel.set_action(action);
+ String name = null;
+ if (action == LogLevelAction.REMOVE) {
+ name = value;
+ } else {
+ String[] splits = value.split("=");
+ Preconditions.checkArgument(splits.length == 2, "Invalid log string '%s'", value);
+ name = splits[0];
+ splits = splits[1].split(":");
+ Integer timeout = 0;
+ Level level = Level.valueOf(splits[0]);
+ logLevel.set_reset_log_level(level.toString());
+ if (splits.length > 1) {
+ timeout = Integer.parseInt(splits[1]);
+ }
+ logLevel.set_reset_log_level_timeout_secs(timeout);
+ }
+ Map<String, LogLevel> result = new HashMap<>();
+ result.put(name, logLevel);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index b62f99c..fe0c431 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -17,11 +17,22 @@
*/
package org.apache.storm.utils;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
+import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
+import org.apache.curator.ensemble.exhibitor.Exhibitors;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
@@ -29,22 +40,24 @@ import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.daemon.JarTransformer;
-import org.apache.storm.generated.*;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AccessControlType;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.serialization.DefaultSerializationDelegate;
import org.apache.storm.serialization.SerializationDelegate;
-import clojure.lang.RT;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
-import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
-import org.apache.curator.ensemble.exhibitor.Exhibitors;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
@@ -118,6 +131,8 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
+import clojure.lang.RT;
+
public class Utils {
// A singleton instance allows us to mock delegated static methods in our
// tests by subclassing.
@@ -1430,21 +1445,29 @@ public class Utils {
}
public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
- NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser);
- TopologyInfo topologyInfo = null;
+ try (NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser)) {
+ String topologyId = getTopologyId(name, client.getClient());
+ if (null != topologyId) {
+ return client.getClient().getTopologyInfo(topologyId);
+ }
+ return null;
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String getTopologyId(String name, Nimbus.Client client) {
try {
- ClusterSummary summary = client.getClient().getClusterInfo();
+ ClusterSummary summary = client.getClusterInfo();
for(TopologySummary s : summary.get_topologies()) {
if(s.get_name().equals(name)) {
- topologyInfo = client.getClient().getTopologyInfo(s.get_id());
+ return s.get_id();
}
}
} catch(Exception e) {
throw new RuntimeException(e);
- } finally {
- client.close();
}
- return topologyInfo;
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java b/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java
new file mode 100644
index 0000000..4582371
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.LogLevel;
+import org.apache.storm.generated.LogLevelAction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SetLogLevelTest {
+
+ @Test
+ public void testUpdateLogLevelParser() {
+ SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE);
+ LogLevel logLevel = ((Map<String, LogLevel>) logLevelsParser.parse("com.foo.one=warn")).get("com.foo.one");
+ Assert.assertEquals(0, logLevel.get_reset_log_level_timeout_secs());
+ Assert.assertEquals("WARN", logLevel.get_reset_log_level());
+
+ logLevel = ((Map<String, LogLevel>) logLevelsParser.parse("com.foo.two=DEBUG:10")).get("com.foo.two");
+ Assert.assertEquals(10, logLevel.get_reset_log_level_timeout_secs());
+ Assert.assertEquals("DEBUG", logLevel.get_reset_log_level());
+ }
+
+ @Test(expected = NumberFormatException.class)
+ public void testInvalidTimeout() {
+ SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE);
+ logLevelsParser.parse("com.foo.bar=warn:NaN");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidLogLevel() {
+ SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE);
+ logLevelsParser.parse("com.foo.bar=CRITICAL");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
index b647458..c9a4b79 100644
--- a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
+++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
@@ -18,42 +18,62 @@
package org.apache.storm.command;
-import java.util.Map;
+import org.junit.Test;
+
+import java.util.HashMap;
import java.util.List;
-import java.util.Arrays;
+import java.util.Map;
-import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
public class TestCLI {
+
@Test
public void testSimple() throws Exception {
Map<String, Object> values = CLI.opt("a", "aa", null)
- .opt("b", "bb", 1, CLI.AS_INT)
- .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
- .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
- .arg("A")
- .arg("B", CLI.AS_INT)
- .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3");
- assertEquals(6, values.size());
- assertEquals("200", (String)values.get("a"));
- assertEquals((Integer)40, (Integer)values.get("b"));
- assertEquals((Integer)2, (Integer)values.get("c"));
-
- List<String> d = (List<String>)values.get("d");
+ .opt("b", "bb", 1, CLI.AS_INT)
+ .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
+ .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
+ .opt("e", "ee", null, new PairParse(), CLI.INTO_MAP)
+ .arg("A")
+ .arg("B", CLI.AS_INT)
+ .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"
+ , "-e", "key1=value1", "-e", "key2=value2");
+ assertEquals(7, values.size());
+ assertEquals("200", (String) values.get("a"));
+ assertEquals((Integer) 40, (Integer) values.get("b"));
+ assertEquals((Integer) 2, (Integer) values.get("c"));
+
+ List<String> d = (List<String>) values.get("d");
assertEquals(3, d.size());
assertEquals("1", d.get(0));
assertEquals("2", d.get(1));
assertEquals("3", d.get(2));
- List<String> A = (List<String>)values.get("A");
+ List<String> A = (List<String>) values.get("A");
assertEquals(1, A.size());
assertEquals("A-VALUE", A.get(0));
- List<Integer> B = (List<Integer>)values.get("B");
+ List<Integer> B = (List<Integer>) values.get("B");
assertEquals(3, B.size());
- assertEquals((Integer)1, B.get(0));
- assertEquals((Integer)2, B.get(1));
- assertEquals((Integer)3, B.get(2));
+ assertEquals((Integer) 1, B.get(0));
+ assertEquals((Integer) 2, B.get(1));
+ assertEquals((Integer) 3, B.get(2));
+
+ Map<String, String> e = (Map<String, String>) values.get("e");
+ assertEquals(2, e.size());
+ assertEquals("value1", e.get("key1"));
+ assertEquals("value2", e.get("key2"));
+ }
+
+ private static final class PairParse implements CLI.Parse {
+
+ @Override
+ public Object parse(String value) {
+ Map<String, String> result = new HashMap<>();
+ String[] splits = value.split("=");
+ result.put(splits[0], splits[1]);
+ return result;
+ }
}
}
[6/6] storm git commit: Added STORM-1267 STORM-1266 and STORM-1265 to
Changelog
Posted by bo...@apache.org.
Added STORM-1267 STORM-1266 and STORM-1265 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dece08fb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dece08fb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dece08fb
Branch: refs/heads/master
Commit: dece08fbdee7903c76c227e3ec638e57705856cd
Parents: b837590
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Feb 24 11:32:42 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Feb 24 11:32:42 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dece08fb/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 285cfe4..b2535d1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
## 2.0.0
+ * STORM-1267: Port set_log_level
+ * STORM-1266: Port rebalance
+ * STORM-1265: Port monitor
* STORM-1572: throw NPE when parsing the command line arguments by CLI
* STORM-1273: port backtype.storm.cluster to java
* STORM-1479: use a simple implemention for IntSerializer
[2/6] storm git commit: STORM-1266: port
backtype.storm.command.rebalance to java
Posted by bo...@apache.org.
STORM-1266: port backtype.storm.command.rebalance to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8aaa8388
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8aaa8388
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8aaa8388
Branch: refs/heads/master
Commit: 8aaa83880de7a5bc9595a6e3aabc242c3bec5470
Parents: 3425e7d
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:38:43 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:38:43 2016 +0530
----------------------------------------------------------------------
.../clj/org/apache/storm/command/rebalance.clj | 47 -----------
.../jvm/org/apache/storm/command/Rebalance.java | 86 ++++++++++++++++++++
.../org/apache/storm/command/RebalanceTest.java | 41 ++++++++++
3 files changed, 127 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/clj/org/apache/storm/command/rebalance.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/rebalance.clj b/storm-core/src/clj/org/apache/storm/command/rebalance.clj
deleted file mode 100644
index 8428d14..0000000
--- a/storm-core/src/clj/org/apache/storm/command/rebalance.clj
+++ /dev/null
@@ -1,47 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.rebalance
- (:use [clojure.tools.cli :only [cli]])
- (:use [org.apache.storm config log])
- (:use [org.apache.storm.internal thrift])
- (:import [org.apache.storm.generated RebalanceOptions])
- (:gen-class))
-
-(defn- parse-executor [^String s]
- (let [eq-pos (.lastIndexOf s "=")
- name (.substring s 0 eq-pos)
- amt (.substring s (inc eq-pos))]
- {name (Integer/parseInt amt)}
- ))
-
-(defn -main [& args]
- (let [[{wait :wait executor :executor num-workers :num-workers} [name] _]
- (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]
- ["-n" "--num-workers" :default nil :parse-fn #(Integer/parseInt %)]
- ["-e" "--executor" :parse-fn parse-executor
- :assoc-fn (fn [previous key val]
- (assoc previous key
- (if-let [oldval (get previous key)]
- (merge oldval val)
- val)))])
- opts (RebalanceOptions.)]
- (if wait (.set_wait_secs opts wait))
- (if executor (.set_num_executors opts executor))
- (if num-workers (.set_num_workers opts num-workers))
- (with-configured-nimbus-connection nimbus
- (.rebalance nimbus name opts)
- (log-message "Topology " name " is rebalancing")
- )))
http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
new file mode 100644
index 0000000..ed65950
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.lang.String.format;
+
+public class Rebalance {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Rebalance.class);
+
+ public static void main(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+ .opt("n", "num-workers", null, CLI.AS_INT)
+ .opt("e", "executor", null, new ExecutorParser(), CLI.INTO_MAP)
+ .arg("topologyName", CLI.FIRST_WINS)
+ .parse(args);
+ final String name = (String) cl.get("topologyName");
+ final RebalanceOptions rebalanceOptions = new RebalanceOptions();
+ Integer wait = (Integer) cl.get("w");
+ Integer numWorkers = (Integer) cl.get("n");
+ Map<String, Integer> numExecutors = (Map<String, Integer>) cl.get("e");
+
+ if (null != wait) {
+ rebalanceOptions.set_wait_secs(wait);
+ }
+ if (null != numWorkers) {
+ rebalanceOptions.set_num_workers(numWorkers);
+ }
+ if (null != numExecutors) {
+ rebalanceOptions.set_num_executors(numExecutors);
+ }
+
+ NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+ @Override
+ public void run(Nimbus.Client nimbus) throws Exception {
+ nimbus.rebalance(name, rebalanceOptions);
+ LOG.info("Topology {} is rebalancing", name);
+ }
+ });
+ }
+
+
+ static final class ExecutorParser implements CLI.Parse {
+
+ @Override
+ public Object parse(String value) {
+ try {
+ int splitIndex = value.lastIndexOf('=');
+ String componentName = value.substring(0, splitIndex);
+ Integer parallelism = Integer.parseInt(value.substring(splitIndex + 1));
+ Map<String, Integer> result = new HashMap<String, Integer>();
+ result.put(componentName, parallelism);
+ return result;
+ } catch (Throwable ex) {
+ throw new IllegalArgumentException(
+ format("Failed to parse '%s' correctly. Expected in <component>=<parallelism> format", value), ex);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8aaa8388/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
new file mode 100644
index 0000000..cec4958
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/command/RebalanceTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.storm.command;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class RebalanceTest {
+
+ @Test
+ public void testParser() throws Exception {
+ Rebalance.ExecutorParser executorParser = new Rebalance.ExecutorParser();
+ Map<String, Integer> componentParallelism = (Map<String, Integer>) executorParser.parse("comp1=3");
+ Assert.assertEquals(3, (int) componentParallelism.get("comp1"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExepction() throws Exception {
+ Rebalance.ExecutorParser executorParser = new Rebalance.ExecutorParser();
+ executorParser.parse("comp1 3");
+ }
+}
[5/6] storm git commit: Merge branch 'commands' of
https://github.com/abhishekagarwal87/storm into STORM-1267
Posted by bo...@apache.org.
Merge branch 'commands' of https://github.com/abhishekagarwal87/storm into STORM-1267
Conflicts:
storm-core/test/jvm/org/apache/storm/command/TestCLI.java
STORM-1267: Port set_log_level
STORM-1266: Port rebalance
STORM-1265: Port monitor
This closes #1137
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8375907
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8375907
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8375907
Branch: refs/heads/master
Commit: b83759074b7b0e7b1727dd4fac1c2b5ab9c4cdad
Parents: 56bc603 2d4ad6f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Feb 24 11:30:24 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Feb 24 11:30:24 2016 -0600
----------------------------------------------------------------------
bin/storm.cmd | 2 +-
bin/storm.py | 6 +-
.../clj/org/apache/storm/command/monitor.clj | 37 ------
.../clj/org/apache/storm/command/rebalance.clj | 47 --------
.../org/apache/storm/command/set_log_level.clj | 76 ------------
.../src/jvm/org/apache/storm/command/CLI.java | 25 +++-
.../jvm/org/apache/storm/command/Monitor.java | 65 +++++++++++
.../jvm/org/apache/storm/command/Rebalance.java | 86 ++++++++++++++
.../org/apache/storm/command/SetLogLevel.java | 116 +++++++++++++++++++
.../src/jvm/org/apache/storm/utils/Utils.java | 61 +++++++---
.../org/apache/storm/command/RebalanceTest.java | 41 +++++++
.../apache/storm/command/SetLogLevelTest.java | 54 +++++++++
.../jvm/org/apache/storm/command/TestCLI.java | 42 +++++--
13 files changed, 458 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b8375907/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b8375907/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b8375907/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
----------------------------------------------------------------------
diff --cc storm-core/test/jvm/org/apache/storm/command/TestCLI.java
index 5b2f220,c9a4b79..d1caa3c
--- a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
+++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java
@@@ -29,20 -31,20 +31,22 @@@ public class TestCLI
@Test
public void testSimple() throws Exception {
Map<String, Object> values = CLI.opt("a", "aa", null)
- .opt("b", "bb", 1, CLI.AS_INT)
- .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
- .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
- .opt("e", "ee", null, new PairParse(), CLI.INTO_MAP)
- .arg("A")
- .arg("B", CLI.AS_INT)
- .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"
- , "-e", "key1=value1", "-e", "key2=value2");
- assertEquals(7, values.size());
- assertEquals("200", (String) values.get("a"));
- assertEquals((Integer) 40, (Integer) values.get("b"));
- assertEquals((Integer) 2, (Integer) values.get("c"));
+ .opt("b", "bb", 1, CLI.AS_INT)
+ .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS)
+ .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST)
+ .opt("e", "ee", null, CLI.AS_INT)
++ .opt("f", "ff", null, new PairParse(), CLI.INTO_MAP)
+ .arg("A")
+ .arg("B", CLI.AS_INT)
- .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3");
- assertEquals(7, values.size());
++ .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"
++ , "-f", "key1=value1", "-f", "key2=value2");
++ assertEquals(8, values.size());
+ assertEquals("200", (String)values.get("a"));
+ assertEquals((Integer)40, (Integer)values.get("b"));
+ assertEquals((Integer)2, (Integer)values.get("c"));
+ assertEquals(null, values.get("e"));
- List<String> d = (List<String>) values.get("d");
+ List<String> d = (List<String>)values.get("d");
assertEquals(3, d.size());
assertEquals("1", d.get(0));
assertEquals("2", d.get(1));
@@@ -52,10 -54,26 +56,26 @@@
assertEquals(1, A.size());
assertEquals("A-VALUE", A.get(0));
- List<Integer> B = (List<Integer>)values.get("B");
+ List<Integer> B = (List<Integer>) values.get("B");
assertEquals(3, B.size());
- assertEquals((Integer)1, B.get(0));
- assertEquals((Integer)2, B.get(1));
- assertEquals((Integer)3, B.get(2));
+ assertEquals((Integer) 1, B.get(0));
+ assertEquals((Integer) 2, B.get(1));
+ assertEquals((Integer) 3, B.get(2));
+
- Map<String, String> e = (Map<String, String>) values.get("e");
- assertEquals(2, e.size());
- assertEquals("value1", e.get("key1"));
- assertEquals("value2", e.get("key2"));
++ Map<String, String> f = (Map<String, String>) values.get("f");
++ assertEquals(2, f.size());
++ assertEquals("value1", f.get("key1"));
++ assertEquals("value2", f.get("key2"));
+ }
+
+ private static final class PairParse implements CLI.Parse {
+
+ @Override
+ public Object parse(String value) {
+ Map<String, String> result = new HashMap<>();
+ String[] splits = value.split("=");
+ result.put(splits[0], splits[1]);
+ return result;
+ }
}
}
[3/6] storm git commit: STORM-1265: port
backtype.storm.command.monitor to java
Posted by bo...@apache.org.
STORM-1265: port backtype.storm.command.monitor to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93b314cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93b314cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93b314cf
Branch: refs/heads/master
Commit: 93b314cf6e092e42c6cd6116618bfce94797c5cf
Parents: 8aaa838
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:39:10 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:39:10 2016 +0530
----------------------------------------------------------------------
.../clj/org/apache/storm/command/monitor.clj | 37 -----------
.../jvm/org/apache/storm/command/Monitor.java | 65 ++++++++++++++++++++
2 files changed, 65 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/93b314cf/storm-core/src/clj/org/apache/storm/command/monitor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/monitor.clj b/storm-core/src/clj/org/apache/storm/command/monitor.clj
deleted file mode 100644
index 4ec49af..0000000
--- a/storm-core/src/clj/org/apache/storm/command/monitor.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.monitor
- (:use [clojure.tools.cli :only [cli]])
- (:use [org.apache.storm.internal.thrift :only [with-configured-nimbus-connection]])
- (:import [org.apache.storm.utils Monitor])
- (:gen-class)
- )
-
-(defn -main [& args]
- (let [[{interval :interval component :component stream :stream watch :watch} [name] _]
- (cli args ["-i" "--interval" :default 4 :parse-fn #(Integer/parseInt %)]
- ["-m" "--component" :default nil]
- ["-s" "--stream" :default "default"]
- ["-w" "--watch" :default "emitted"])
- mon (Monitor.)]
- (if interval (.set_interval mon interval))
- (if name (.set_topology mon name))
- (if component (.set_component mon component))
- (if stream (.set_stream mon stream))
- (if watch (.set_watch mon watch))
- (with-configured-nimbus-connection nimbus
- (.metrics mon nimbus)
- )))
http://git-wip-us.apache.org/repos/asf/storm/blob/93b314cf/storm-core/src/jvm/org/apache/storm/command/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/Monitor.java b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
new file mode 100644
index 0000000..68a65ea
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.command;
+
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.NimbusClient;
+
+import java.util.Map;
+
+public class Monitor {
+
+ public static void main(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("i", "interval", 4, CLI.AS_INT)
+ .opt("m", "component", null)
+ .opt("s", "stream", "default")
+ .opt("w", "watch", "emitted")
+ .arg("topologyName", CLI.FIRST_WINS)
+ .parse(args);
+ final org.apache.storm.utils.Monitor monitor = new org.apache.storm.utils.Monitor();
+ Integer interval = (Integer) cl.get("i");
+ String component = (String) cl.get("m");
+ String stream = (String) cl.get("s");
+ String watch = (String) cl.get("w");
+ String topologyName = (String) cl.get("topologyName");
+
+ if (null != interval) {
+ monitor.set_interval(interval);
+ }
+ if (null != component) {
+ monitor.set_component(component);
+ }
+ if (null != stream) {
+ monitor.set_stream(stream);
+ }
+ if (null != watch) {
+ monitor.set_watch(watch);
+ }
+ if (null != topologyName) {
+ monitor.set_topology(topologyName);
+ }
+
+ NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+ @Override
+ public void run(Nimbus.Client nimbus) throws Exception {
+ monitor.metrics(nimbus);
+ }
+ });
+ }
+}
[4/6] storm git commit: Update commands in binary files
Posted by bo...@apache.org.
Update commands in binary files
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d4ad6f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d4ad6f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d4ad6f4
Branch: refs/heads/master
Commit: 2d4ad6f4655ff56057d50f359394ecd01fde43eb
Parents: 93b314c
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Mon Feb 22 16:48:28 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Mon Feb 22 16:48:28 2016 +0530
----------------------------------------------------------------------
bin/storm.cmd | 2 +-
bin/storm.py | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2d4ad6f4/bin/storm.cmd
----------------------------------------------------------------------
diff --git a/bin/storm.cmd b/bin/storm.cmd
index ff3b246..1ef1e42 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -194,7 +194,7 @@
goto :eof
:rebalance
- set CLASS=org.apache.storm.command.rebalance
+ set CLASS=org.apache.storm.command.Rebalance
set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS%
goto :eof
http://git-wip-us.apache.org/repos/asf/storm/blob/2d4ad6f4/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index acbfe7b..94d6143 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -378,7 +378,7 @@ def set_log_level(*args):
Clears settings, resetting back to the original level
"""
exec_storm_class(
- "org.apache.storm.command.set_log_level",
+ "org.apache.storm.command.SetLogLevel",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
@@ -433,7 +433,7 @@ def rebalance(*args):
print_usage(command="rebalance")
sys.exit(2)
exec_storm_class(
- "org.apache.storm.command.rebalance",
+ "org.apache.storm.command.Rebalance",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
@@ -685,7 +685,7 @@ def monitor(*args):
watch-item is 'emitted';
"""
exec_storm_class(
- "org.apache.storm.command.monitor",
+ "org.apache.storm.command.Monitor",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])