You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/06/20 19:36:05 UTC

[2/3] storm git commit: STORM-3115: Add zookeeper CLI to admin command

STORM-3115: Add zookeeper CLI to admin command


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fce4c6b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fce4c6b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fce4c6b3

Branch: refs/heads/master
Commit: fce4c6b3a0d37d02ce01458041b07b99f8827f2e
Parents: 2eae159
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jun 19 12:54:38 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jun 19 12:56:51 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/command/AdminCommands.java |  15 +-
 .../src/jvm/org/apache/storm/command/CLI.java   | 152 +++++++++---------
 .../storm/shade/org/apache/zookeeper/ZkCli.java | 156 +++++++++++++++++++
 3 files changed, 243 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fce4c6b3/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index efa36f7..10eaedc 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -31,6 +31,7 @@ import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.shade.org.apache.zookeeper.ZkCli;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
@@ -40,10 +41,10 @@ import org.slf4j.LoggerFactory;
 public class AdminCommands {
     private static final Logger LOG = LoggerFactory.getLogger(AdminCommands.class);
 
-    private interface AdminCommand extends AutoCloseable {
+    public interface AdminCommand {
+        
         /**
-         * Run the command, this will be called at most once.  Close will only be called
-         * if run was called, so we assume all initialization will be lazy.
+         * Run the command, this will be called at most once.
          */
         void run(String [] args, Map<String, Object> conf, String command) throws Exception;
 
@@ -54,11 +55,6 @@ public class AdminCommands {
          *     argument - description
          */
         void printCliHelp(String command, PrintStream out);
-
-        @Override
-        default void close() {
-            //NOOP
-        }
     }
 
     private static class RemoveCorruptTopologies implements AdminCommand {
@@ -91,7 +87,7 @@ public class AdminCommands {
             if (args.length <= 0) {
                 help(null, System.out);
             } else {
-                for (String cn: args) {
+                for (String cn : args) {
                     AdminCommand c = COMMANDS.get(cn);
                     if (c == null) {
                         throw new IllegalArgumentException(cn + " is not a supported command");
@@ -112,6 +108,7 @@ public class AdminCommands {
 
     static {
         COMMANDS.put("remove_corrupt_topologies", new RemoveCorruptTopologies());
+        COMMANDS.put("zk_cli", new ZkCli());
         COMMANDS.put("help", new Help());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fce4c6b3/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
index eb62306..2c3311e 100644
--- a/storm-core/src/jvm/org/apache/storm/command/CLI.java
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -25,71 +25,50 @@ import org.slf4j.LoggerFactory;
 
 public class CLI {
     /**
-     * Parse function to return an Integer
+     * Parse function to return an Integer.
      */
-    public static final Parse AS_INT = new Parse() {
-        @Override
-        public Object parse(String value) {
-            return Integer.valueOf(value);
-        }
-    };
+    public static final Parse AS_INT = value -> Integer.valueOf(value);
+
     /**
      * Noop parse function, returns the String.
      */
-    public static final Parse AS_STRING = new Parse() {
-        @Override
-        public Object parse(String value) {
-            return value;
-        }
-    };
+    public static final Parse AS_STRING = value -> value;
+
     /**
-     * Last occurance on the command line is the resulting value.
+     * Last occurrence on the command line is the resulting value.
      */
-    public static final Assoc LAST_WINS = new Assoc() {
-        @Override
-        public Object assoc(Object current, Object value) {
-            return value;
-        }
-    };
+    public static final Assoc LAST_WINS = (current, value) -> value;
+
     /**
-     * First occurance on the command line is the resulting value.
+     * First occurrence on the command line is the resulting value.
      */
-    public static final Assoc FIRST_WINS = new Assoc() {
-        @Override
-        public Object assoc(Object current, Object value) {
-            return current == null ? value : current;
-        }
-    };
+    public static final Assoc FIRST_WINS = (current, value) -> current == null ? value : current;
+
     /**
      * All values are returned as a List.
      */
-    public static final Assoc INTO_LIST = new Assoc() {
-        @Override
-        public Object assoc(Object current, Object value) {
-            if (current == null) {
-                current = new ArrayList<Object>();
-            }
-            ((List<Object>) current).add(value);
-            return current;
+    public static final Assoc INTO_LIST = (current, value) -> {
+        if (current == null) {
+            current = new ArrayList<>();
         }
+        ((List<Object>) current).add(value);
+        return current;
     };
+
     /**
-     * All values are returned as a map
+     * All values are returned as a map.
      */
-    public static final Assoc INTO_MAP = new Assoc() {
-        @Override
-        public Object assoc(Object current, Object value) {
-            if (null == current) {
-                current = new HashMap<Object, Object>();
-            }
-            ((Map<Object, Object>) current).putAll((Map<Object, Object>) value);
-            return current;
+    public static final Assoc INTO_MAP = (current, value) -> {
+        if (null == current) {
+            current = new HashMap<>();
         }
+        ((Map<Object, Object>) current).putAll((Map<Object, Object>) value);
+        return current;
     };
     private static final Logger LOG = LoggerFactory.getLogger(CLI.class);
 
     /**
-     * Add an option to be parsed
+     * Add an option to be parsed.
      * @param shortName the short single character name of the option (no `-` character proceeds it).
      * @param longName the multi character name of the option (no `--` characters proceed it).
      * @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -100,7 +79,7 @@ public class CLI {
     }
 
     /**
-     * Add an option to be parsed
+     * Add an option to be parsed.
      * @param shortName the short single character name of the option (no `-` character proceeds it).
      * @param longName the multi character name of the option (no `--` characters proceed it).
      * @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -112,7 +91,7 @@ public class CLI {
     }
 
     /**
-     * Add an option to be parsed
+     * Add an option to be parsed.
      * @param shortName the short single character name of the option (no `-` character proceeds it).
      * @param longName the multi character name of the option (no `--` characters proceed it).
      * @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -125,6 +104,16 @@ public class CLI {
     }
 
     /**
+     * Add a boolean option that enables something.
+     * @param shortName the short single character name of the option (no `-` character proceeds it).
+     * @param longName the multi character name of the option (no `--` characters proceed it).
+     * @return a builder to be used to continue creating the command line.
+     */
+    public CLIBuilder boolOpt(String shortName, String longName) {
+        return new CLIBuilder().boolOpt(shortName, longName);
+    }
+
+    /**
      * Add a named argument.
      * @param name the name of the argument.
      * @return a builder to be used to continue creating the command line.
@@ -170,17 +159,17 @@ public class CLI {
          * @param value the String to parse
          * @return the parsed value
          */
-        public Object parse(String value);
+        Object parse(String value);
     }
 
     public interface Assoc {
         /**
-         * Associate a value into somthing else
+         * Associate a value into something else.
          * @param current what to put value into, will be null if no values have been added yet.
          * @param value what to add
          * @return the result of combining the two
          */
-        public Object assoc(Object current, Object value);
+        Object assoc(Object current, Object value);
     }
 
     private static class Opt {
@@ -189,13 +178,15 @@ public class CLI {
         final Object defaultValue;
         final Parse parse;
         final Assoc assoc;
+        final boolean noValue;
 
-        public Opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) {
+        public Opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc, boolean noValue) {
             this.shortName = shortName;
             this.longName = longName;
             this.defaultValue = defaultValue;
             this.parse = parse == null ? AS_STRING : parse;
             this.assoc = assoc == null ? LAST_WINS : assoc;
+            this.noValue = noValue;
         }
 
         public Object process(Object current, String value) {
@@ -224,7 +215,7 @@ public class CLI {
         private final ArrayList<Arg> args = new ArrayList<>();
 
         /**
-         * Add an option to be parsed
+         * Add an option to be parsed.
          * @param shortName the short single character name of the option (no `-` character proceeds it).
          * @param longName the multi character name of the option (no `--` characters proceed it).
          * @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -235,7 +226,7 @@ public class CLI {
         }
 
         /**
-         * Add an option to be parsed
+         * Add an option to be parsed.
          * @param shortName the short single character name of the option (no `-` character proceeds it).
          * @param longName the multi character name of the option (no `--` characters proceed it).
          * @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -247,7 +238,7 @@ public class CLI {
         }
 
         /**
-         * Add an option to be parsed
+         * Add an option to be parsed.
          * @param shortName the short single character name of the option (no `-` character proceeds it).
          * @param longName the multi character name of the option (no `--` characters proceed it).
          * @param defaultValue the value that will be returned of the command if none is given. null if none is given.
@@ -256,7 +247,18 @@ public class CLI {
          * @return a builder to be used to continue creating the command line.
          */
         public CLIBuilder opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) {
-            opts.add(new Opt(shortName, longName, defaultValue, parse, assoc));
+            opts.add(new Opt(shortName, longName, defaultValue, parse, assoc, false));
+            return this;
+        }
+
+        /**
+         * Add a boolean option that enables something.
+         * @param shortName the short single character name of the option (no `-` character proceeds it).
+         * @param longName the multi character name of the option (no `--` characters proceed it).
+         * @return a builder to be used to continue creating the command line.
+         */
+        public CLIBuilder boolOpt(String shortName, String longName) {
+            opts.add(new Opt(shortName, longName, false, null, null, true));
             return this;
         }
 
@@ -304,37 +306,45 @@ public class CLI {
         /**
          * Parse the command line arguments.
          * @param rawArgs the string arguments to be parsed.
-         * @throws Exception on any error.
          * @return The parsed command line.
-         * opts will be stored under the short argument name.
-         * args will be stored under the argument name, unless no arguments are configured, and then they will be stored under "ARGS".
-         * The last argument comnfigured is greedy and is used to process all remaining command line arguments.
+         *     opts will be stored under the short argument name.
+         *     args will be stored under the argument name, unless no arguments are configured, and then they will be stored under "ARGS".
+         *     The last argument configured is greedy and is used to process all remaining command line arguments.
+         * @throws Exception on any error.
          */
         public Map<String, Object> parse(String... rawArgs) throws Exception {
             Options options = new Options();
             for (Opt opt : opts) {
-                options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg().build());
+                if (opt.noValue) {
+                    options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg(false).build());
+                } else {
+                    options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg().build());
+                }
             }
             DefaultParser parser = new DefaultParser();
             CommandLine cl = parser.parse(options, rawArgs);
             HashMap<String, Object> ret = new HashMap<>();
             for (Opt opt : opts) {
-                Object current = null;
-                String[] strings = cl.getOptionValues(opt.shortName);
-                if (strings != null) {
-                    for (String val : strings) {
-                        current = opt.process(current, val);
+                if (opt.noValue) {
+                    ret.put(opt.shortName, cl.hasOption(opt.shortName));
+                } else {
+                    Object current = null;
+                    String[] strings = cl.getOptionValues(opt.shortName);
+                    if (strings != null) {
+                        for (String val : strings) {
+                            current = opt.process(current, val);
+                        }
                     }
+                    if (current == null) {
+                        current = opt.defaultValue;
+                    }
+                    ret.put(opt.shortName, current);
                 }
-                if (current == null) {
-                    current = opt.defaultValue;
-                }
-                ret.put(opt.shortName, current);
             }
             List<String> stringArgs = cl.getArgList();
             if (args.size() > stringArgs.size()) {
-                throw new RuntimeException("Wrong number of arguments at least " + args.size() +
-                                           " expected, but only " + stringArgs.size() + " found");
+                throw new RuntimeException("Wrong number of arguments at least " + args.size()
+                    + " expected, but only " + stringArgs.size() + " found");
             }
 
             int argIndex = 0;

http://git-wip-us.apache.org/repos/asf/storm/blob/fce4c6b3/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java b/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java
new file mode 100644
index 0000000..57ec899
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/shade/org/apache/zookeeper/ZkCli.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF)
+ * under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+//This is a hack to allow ZooKeeperMain to be called by this command.
+
+package org.apache.storm.shade.org.apache.zookeeper;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.command.AdminCommands;
+import org.apache.storm.command.CLI;
+import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.shade.org.apache.zookeeper.data.Stat;
+import org.apache.storm.utils.ObjectReader;
+
+public class ZkCli implements AdminCommands.AdminCommand {
+    @Override
+    public void run(String[] args, Map<String, Object> conf, String command) throws Exception {
+        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        int port = ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT));
+        String root = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
+        Map<String, Object> cl = CLI.opt("s","server", null, CLI.AS_STRING, CLI.LAST_WINS)
+            .opt("t", "time-out", ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                CLI.AS_INT, CLI.LAST_WINS)
+            .boolOpt("w", "write")
+            .boolOpt("n", "no-root")
+            .opt("j", "jaas", conf.get("java.security.auth.login.config"), CLI.AS_STRING, CLI.LAST_WINS)
+            .boolOpt("h", "help")
+            .parse(args);
+
+        if ((Boolean)cl.get("h")) {
+            printCliHelp(command, System.out);
+            return;
+        }
+
+        String jaas = (String)cl.get("j");
+        if (jaas != null && !jaas.isEmpty()) {
+            System.setProperty("java.security.auth.login.config", jaas);
+        }
+
+        String connectionString = (String) cl.get("s");
+        if (connectionString == null) {
+            StringBuilder sb = new StringBuilder();
+            boolean isFirst = true;
+            for (String zkServer : servers) {
+                if (!isFirst) {
+                    sb.append(',');
+                }
+                isFirst = false;
+                sb.append(zkServer).append(':').append(port);
+            }
+            if (!(Boolean)cl.get("n")) {
+                sb.append(root);
+            }
+            connectionString = sb.toString();
+        }
+
+        boolean readOnly = !(Boolean)cl.get("w");
+        int timeout = (Integer)cl.get("t");
+        ZooKeeper zk;
+        if (readOnly) {
+            zk = new ReadOnlyZookeeper(connectionString, timeout, watchedEvent -> { });
+        } else {
+            zk = new ZooKeeper(connectionString, timeout, watchedEvent -> { });
+        }
+        ZooKeeperMain main = new ZooKeeperMain(zk);
+        main.run();
+    }
+
+    @Override
+    public void printCliHelp(String command, PrintStream out) {
+        out.println(command + " [<opts>]:");
+        out.println("\tStart a zookeeper shell");
+        out.println();
+        out.println("\t-s --server <connection string>: Set the connection string to use, defaults to storm connection string.");
+        out.println("\t-t --time-out <timeout>:         Set the timeout to use, defaults to storm zookeeper timeout.");
+        out.println("\t-w --write:                      Allow for writes, defaults to read only, we don't want to cause problems.");
+        out.println("\t-n --no-root:                    Don't include the storm root on the default connection string.");
+        out.println("\t-j --jaas <jaas_file>:           Include a jaas file that should be used when authenticating with\n\t\t"
+            + "ZK defaults to the java.security.auth.login.config conf.");
+    }
+
+    private static class ReadOnlyZookeeper extends ZooKeeper {
+        ReadOnlyZookeeper(String connectionString, int timeout, Watcher watcher) throws IOException {
+            super(connectionString, timeout, watcher);
+        }
+
+        @Override
+        public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException {
+            throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+        }
+
+        @Override
+        public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback cb, Object ctx) {
+            throw new IllegalArgumentException("In Read Only Mode");
+        }
+
+        @Override
+        public void delete(String path, int version) throws KeeperException {
+            throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+        }
+
+        @Override
+        public void delete(String path, int version, AsyncCallback.VoidCallback cb, Object ctx) {
+            throw new IllegalArgumentException("In Read Only Mode");
+        }
+
+        @Override
+        public List<OpResult> multi(Iterable<Op> ops) throws KeeperException {
+            throw KeeperException.create(KeeperException.Code.NOTREADONLY, "multi opt");
+        }
+
+        @Override
+        public Transaction transaction() {
+            throw new IllegalArgumentException("In Read Only Mode");
+        }
+
+        @Override
+        public Stat setData(String path, byte[] data, int version) throws KeeperException {
+            throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+        }
+
+        @Override
+        public void setData(String path, byte[] data, int version, AsyncCallback.StatCallback cb, Object ctx) {
+            throw new IllegalArgumentException("In Read Only Mode");
+        }
+
+        @Override
+        public Stat setACL(String path, List<ACL> acl, int version) throws KeeperException {
+            throw KeeperException.create(KeeperException.Code.NOTREADONLY, path);
+        }
+
+        @Override
+        public void setACL(String path, List<ACL> acl, int version, AsyncCallback.StatCallback cb, Object ctx) {
+            throw new IllegalArgumentException("In Read Only Mode");
+        }
+    }
+}