You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/06/20 19:36:05 UTC
[2/3] storm git commit: STORM-3115: Add zookeeper CLI to admin command
STORM-3115: Add zookeeper CLI to admin command
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fce4c6b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fce4c6b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fce4c6b3
Branch: refs/heads/master
Commit: fce4c6b3a0d37d02ce01458041b07b99f8827f2e
Parents: 2eae159
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jun 19 12:54:38 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jun 19 12:56:51 2018 -0500
----------------------------------------------------------------------
.../org/apache/storm/command/AdminCommands.java | 15 +-
.../src/jvm/org/apache/storm/command/CLI.java | 152 +++++++++---------
.../storm/shade/org/apache/zookeeper/ZkCli.java | 156 +++++++++++++++++++
3 files changed, 243 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fce4c6b3/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index efa36f7..10eaedc 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -31,6 +31,7 @@ import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.shade.org.apache.zookeeper.ZkCli;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
@@ -40,10 +41,10 @@ import org.slf4j.LoggerFactory;
public class AdminCommands {
private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class);
- private interface AdminCommand extends AutoCloseable {
+ public interface AdminCommand {
+
/**
- * Run the command, this will be called at most once. Close will only be called
- * if run was called, so we assume all initialization will be lazy.
+ * Run the command, this will be called at most once.
*/
void run(String [] args, Map<String, Object> conf, String command) throws Exception;
@@ -54,11 +55,6 @@ public class AdminCommands {
* argument - description
*/
void printCliHelp(String command, PrintStream out);
-
- @Override
- default void close() {
- //NOOP
- }
}
private static class RemoveCorruptTopologies implements AdminCommand {
@@ -91,7 +87,7 @@ public class AdminCommands {
if (args.length <= 0) {
help(null, System.out);
} else {
- for (String cn: args) {
+ for (String cn : args) {
AdminCommand c = COMMANDS.get(cn);
if (c == null) {
throw new IllegalArgumentException(cn + " is not a supported command");
@@ -112,6 +108,7 @@ public class AdminCommands {
static {
COMMANDS.put("remove_corrupt_topologies", new RemoveCorruptTopologies());
+ COMMANDS.put("zk_cli", new ZkCli());
COMMANDS.put("help", new Help());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fce4c6b3/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
index eb62306..2c3311e 100644
--- a/storm-core/src/jvm/org/apache/storm/command/CLI.java
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -25,71 +25,50 @@ import org.slf4j.LoggerFactory;
public class CLI {
/**
- * Parse function to return an Integer
+ * Parse function to return an Integer.
*/
- public static final Parse AS_INT = new Parse() {
- @Override
- public Object parse(String value) {
- return Integer.valueOf(value);
- }
- };
+ public static final Parse AS_INT = value -> Integer.valueOf(value);
+
/**
* Noop parse function, returns the String.
*/
- public static final Parse AS_STRING = new Parse() {
- @Override
- public Object parse(String value) {
- return value;
- }
- };
+ public static final Parse AS_STRING = value -> value;
+
/**
- * Last occurance on the command line is the resulting value.
+ * Last occurrence on the command line is the resulting value.
*/
- public static final Assoc LAST_WINS = new Assoc() {
- @Override
- public Object assoc(Object current, Object value) {
- return value;
- }
- };
+ public static final Assoc LAST_WINS = (current, value) -> value;
+
/**
- * First occurance on the command line is the resulting value.
+ * First occurrence on the command line is the resulting value.
*/
- public static final Assoc FIRST_WINS = new Assoc() {
- @Override
- public Object assoc(Object current, Object value) {
- return current == null ? value : current;
- }
- };
+ public static final Assoc FIRST_WINS = (current, value) -> current == null ? value : current;
+
/**
* All values are returned as a List.
*/
- public static final Assoc INTO_LIST = new Assoc() {
- @Override
- public Object assoc(Object current, Object value) {
- if (current == null) {
- current = new ArrayList<Object>();
- }
- ((List<Object>) current).add(value);
- return current;
+ public static final Assoc INTO_LIST = (current, value) -> {
+ if (current == null) {
+ current = new ArrayList<>();
}
+ ((List<Object>) current).add(value);
+ return current;
};
+
/**
- * All values are returned as a map
+ * All values are returned as a map.
*/
- public static final Assoc INTO_MAP = new Assoc() {
- @Override
- public Object assoc(Object current, Object value) {
- if (null == current) {
- current = new HashMap<Object, Object>();
- }
- ((Map<Object, Object>) current).putAll((Map<Object, Object>) value);
- return current;
+ public static final Assoc INTO_MAP = (current, value) -> {
+ if (null == current) {
+ current = new HashMap<>();
}
+ ((Map<Object, Object>) current).putAll((Map<Object, Object>) value);
+ return current;
};
private static final Logger LOG = LoggerFactory.getLogger(CLI.class);
/**
- * Add an option to be parsed
+ * Add an option to be parsed.
* @param shortName the short single character name of the option (no `-` character proceeds it).
* @param longName the multi character name of the option (no `--` characters proceed it).
* @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -100,7 +79,7 @@ public class CLI {
}
/**
- * Add an option to be parsed
+ * Add an option to be parsed.
* @param shortName the short single character name of the option (no `-` character proceeds it).
* @param longName the multi character name of the option (no `--` characters proceed it).
* @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -112,7 +91,7 @@ public class CLI {
}
/**
- * Add an option to be parsed
+ * Add an option to be parsed.
* @param shortName the short single character name of the option (no `-` character proceeds it).
* @param longName the multi character name of the option (no `--` characters proceed it).
* @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -125,6 +104,16 @@ public class CLI {
}
/**
+ * Add a boolean option that enables something.
+ * @param shortName the short single character name of the option (no `-` character proceeds it).
+ * @param longName the multi character name of the option (no `--` characters proceed it).
+ * @return a builder to be used to continue creating the command line.
+ */
+ public CLIBuilder boolOpt(String shortName, String longName) {
+ return new CLIBuilder().boolOpt(shortName, longName);
+ }
+
+ /**
* Add a named argument.
* @param name the name of the argument.
* @return a builder to be used to continue creating the command line.
@@ -170,17 +159,17 @@ public class CLI {
* @param value the String to parse
* @return the parsed value
*/
- public Object parse(String value);
+ Object parse(String value);
}
public interface Assoc {
/**
- * Associate a value into somthing else
+ * Associate a value into something else.
* @param current what to put value into, will be null if no values have been added yet.
* @param value what to add
* @return the result of combining the two
*/
- public Object assoc(Object current, Object value);
+ Object assoc(Object current, Object value);
}
private static class Opt {
@@ -189,13 +178,15 @@ public class CLI {
final Object defaultValue;
final Parse parse;
final Assoc assoc;
+ final boolean noValue;
- public Opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) {
+ public Opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc, boolean noValue) {
this.shortName = shortName;
this.longName = longName;
this.defaultValue = defaultValue;
this.parse = parse == null ? AS_STRING : parse;
this.assoc = assoc == null ? LAST_WINS : assoc;
+ this.noValue = noValue;
}
public Object process(Object current, String value) {
@@ -224,7 +215,7 @@ public class CLI {
private final ArrayList<Arg> args = new ArrayList<>();
/**
- * Add an option to be parsed
+ * Add an option to be parsed.
* @param shortName the short single character name of the option (no `-` character proceeds it).
* @param longName the multi character name of the option (no `--` characters proceed it).
* @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -235,7 +226,7 @@ public class CLI {
}
/**
- * Add an option to be parsed
+ * Add an option to be parsed.
* @param shortName the short single character name of the option (no `-` character proceeds it).
* @param longName the multi character name of the option (no `--` characters proceed it).
* @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -247,7 +238,7 @@ public class CLI {
}
/**
- * Add an option to be parsed
+ * Add an option to be parsed.
* @param shortName the short single character name of the option (no `-` character proceeds it).
* @param longName the multi character name of the option (no `--` characters proceed it).
* @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -256,7 +247,18 @@ public class CLI {
* @return a builder to be used to continue creating the command line.
*/
public CLIBuilder opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) {
- opts.add(new Opt(shortName, longName, defaultValue, parse, assoc));
+ opts.add(new Opt(shortName, longName, defaultValue, parse, assoc, false));
+ return this;
+ }
+
+ /**
+ * Add a boolean option that enables something.
+ * @param shortName the short single character name of the option (no `-` character proceeds it).
+ * @param longName the multi character name of the option (no `--` characters proceed it).
+ * @return a builder to be used to continue creating the command line.
+ */
+ public CLIBuilder boolOpt(String shortName, String longName) {
+ opts.add(new Opt(shortName, longName, false, null, null, true));
return this;
}
@@ -304,37 +306,45 @@ public class CLI {
/**
* Parse the command line arguments.
* @param rawArgs the string arguments to be parsed.
- * @throws Exception on any error.
* @return The parsed command line.
- * opts will be stored under the short argument name.
- * args will be stored under the argument name, unless no arguments are configured, and then they will be stored under "ARGS".
- * The last argument comnfigured is greedy and is used to process all remaining command line arguments.
+ * opts will be stored under the short argument name.
+ * args will be stored under the argument name, unless no arguments are configured, and then they will be stored under "ARGS".
+ * The last argument configured is greedy and is used to process all remaining command line arguments.
+ * @throws Exception on any error.
*/
public Map<String, Object> parse(String... rawArgs) throws Exception {
Options options = new Options();
for (Opt opt : opts) {
- options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg().build());
+ if (opt.noValue) {
+ options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg(false).build());
+ } else {
+ options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg().build());
+ }
}
DefaultParser parser = new DefaultParser();
CommandLine cl = parser.parse(options, rawArgs);
HashMap<String, Object> ret = new HashMap<>();
for (Opt opt : opts) {
- Object current = null;
- String[] strings = cl.getOptionValues(opt.shortName);
- if (strings != null) {
- for (String val : strings) {
- current = opt.process(current, val);
+ if (opt.noValue) {
+ ret.put(opt.shortName, cl.hasOption(opt.shortName));
+ } else {
+ Object current = null;
+ String[] strings = cl.getOptionValues(opt.shortName);
+ if (strings != null) {
+ for (String val : strings) {
+ current = opt.process(current, val);
+ }
}
+ if (current == null) {
+ current = opt.defaultValue;
+ }
+ ret.put(opt.shortName, current);
}
- if (current == null) {
- current = opt.defaultValue;
- }
- ret.put(opt.shortName, current);
}
List<String> stringArgs = cl.getArgList();
if (args.size() > stringArgs.size()) {
- throw new RuntimeException("Wrong number of arguments at least " + args.size() +
- " expected, but only " + stringArgs.size() + " found");
+ throw new RuntimeException("Wrong number of arguments at least " + args.size()
+ + " expected, but only " + stringArgs.size() + " found");
}
int argIndex = 0;
http://git-wip-us.apache.org/repos/asf/storm/blob/fce4c6b3/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java b/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java
new file mode 100644
index 0000000..57ec899
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF)
+ * under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+//This is a hack to allow ZooKeeperMain to be called by this command.
+
+package org.apache.storm.shade.org.apache.zookeeper;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.command.AdminCommands;
+import org.apache.storm.command.CLI;
+import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.shade.org.apache.zookeeper.data.Stat;
+import org.apache.storm.utils.ObjectReader;
+
+public class ZkCli implements AdminCommands.AdminCommand {
+ @Override
+ public void run(String[] args, Map<String, Object> conf, String command) throws Exception {
+ List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+ int port = ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT));
+ String root = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
+ Map<String, Object> cl = CLI.opt("s","server", null, CLI.AS_STRING, CLI.LAST_WINS)
+ .opt("t", "time-out", ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+ CLI.AS_INT, CLI.LAST_WINS)
+ .boolOpt("w", "write")
+ .boolOpt("n", "no-root")
+ .opt("j", "jaas", conf.get("java.security.auth.login.config"), CLI.AS_STRING, CLI.LAST_WINS)
+ .boolOpt("h", "help")
+ .parse(args);
+
+ if ((Boolean)cl.get("h")) {
+ printCliHelp(command, System.out);
+ return;
+ }
+
+ String jaas = (String)cl.get("j");
+ if (jaas != null && !jaas.isEmpty()) {
+ System.setProperty("java.security.auth.login.config", jaas);
+ }
+
+ String connectionString = (String) cl.get("s");
+ if (connectionString == null) {
+ StringBuilder sb = new StringBuilder();
+ boolean isFirst = true;
+ for (String zkServer : servers) {
+ if (!isFirst) {
+ sb.append(',');
+ }
+ isFirst = false;
+ sb.append(zkServer).append(':').append(port);
+ }
+ if (!(Boolean)cl.get("n")) {
+ sb.append(root);
+ }
+ connectionString = sb.toString();
+ }
+
+ boolean readOnly = !(Boolean)cl.get("w");
+ int timeout = (Integer)cl.get("t");
+ ZooKeeper zk;
+ if (readOnly) {
+ zk = new ReadOnlyZookeeper(connectionString, timeout, watchedEvent -> { });
+ } else {
+ zk = new ZooKeeper(connectionString, timeout, watchedEvent -> { });
+ }
+ ZooKeeperMain main = new ZooKeeperMain(zk);
+ main.run();
+ }
+
+ @Override
+ public void printCliHelp(String command, PrintStream out) {
+ out.println(command + " [<opts>]:");
+ out.println("\tStart a zookeeper shell");
+ out.println();
+ out.println("\t-s --server <connection string>: Set the connection string to use, defaults to storm connection string.");
+ out.println("\t-t --time-out <timeout>: Set the timeout to use, defaults to storm zookeeper timeout.");
+ out.println("\t-w --write: Allow for writes, defaults to read only, we don't want to cause problems.");
+ out.println("\t-n --no-root: Don't include the storm root on the default connection string.");
+ out.println("\t-j --jaas <jaas_file>: Include a jaas file that should be used when authenticating with\n\t\t"
+ + "ZK defaults to the java.security.auth.login.config conf.");
+ }
+
+ private static class ReadOnlyZookeeper extends ZooKeeper {
+ ReadOnlyZookeeper(String connectionString, int timeout, Watcher watcher) throws IOException {
+ super(connectionString, timeout, watcher);
+ }
+
+ @Override
+ public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException {
+ throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+ }
+
+ @Override
+ public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback cb, Object ctx) {
+ throw new IllegalArgumentException("In Read Only Mode");
+ }
+
+ @Override
+ public void delete(String path, int version) throws KeeperException {
+ throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+ }
+
+ @Override
+ public void delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) {
+ throw new IllegalArgumentException("In Read Only Mode");
+ }
+
+ @Override
+ public List<OpResult> multi(Iterable<Op> ops) throws KeeperException {
+ throw KeeperException.create(KeeperException.Code.NOTREADONLY, "multi opt");
+ }
+
+ @Override
+ public Transaction transaction() {
+ throw new IllegalArgumentException("In Read Only Mode");
+ }
+
+ @Override
+ public Stat setData(String path, byte[] data, int version) throws KeeperException {
+ throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+ }
+
+ @Override
+ public void setData(String path, byte[] data, int version, AsyncCallback.StatCallback cb, Object ctx) {
+ throw new IllegalArgumentException("In Read Only Mode");
+ }
+
+ @Override
+ public Stat setACL(String path, List<ACL> acl, int version) throws KeeperException {
+ throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+ }
+
+ @Override
+ public void setACL(String path, List<ACL> acl, int version, AsyncCallback.StatCallback cb, Object ctx) {
+ throw new IllegalArgumentException("In Read Only Mode");
+ }
+ }
+}