You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/07/19 12:24:36 UTC

[pulsar] branch master updated: [feature][cli] Pulsar shell - non interactive mode - part 3 (#16268)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de56ac07c14 [feature][cli] Pulsar shell - non interactive mode - part 3 (#16268)
de56ac07c14 is described below

commit de56ac07c14b3f80093caaef05b3bb92c5ffa161
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Jul 19 14:24:29 2022 +0200

    [feature][cli] Pulsar shell - non interactive mode - part 3 (#16268)
    
    * [feature][cli] Pulsar shell - add support for input files and pipe for running multiple commands - part 2
    
    * [feature][cli] Pulsar shell - Env variables interpolation
    
    * Pulsar Shell: fix sh command and better error message for non-interactive mode failures
    
    * Add commons-text to LICENSE
---
 bin/pulsar-admin-common.sh                         |   6 +-
 bin/pulsar-shell                                   |  14 +-
 distribution/server/src/assemble/LICENSE.bin.txt   |   1 +
 distribution/shell/pom.xml                         |  17 +
 pom.xml                                            |   1 +
 pulsar-client-tools/pom.xml                        |   5 +
 .../java/org/apache/pulsar/shell/AdminShell.java   |   5 +-
 .../java/org/apache/pulsar/shell/ClientShell.java  |   6 +-
 .../apache/pulsar/shell/JCommanderCompleter.java   |   3 +-
 .../java/org/apache/pulsar/shell/PulsarShell.java  | 344 ++++++++++++++++++---
 .../apache/pulsar/shell/ShellCommandsProvider.java |   2 +-
 .../org/apache/pulsar/shell/PulsarShellTest.java   | 157 ++++++++--
 .../src/test/resources/test-shell-file             |  22 +-
 .../src/test/resources/test-shell-file-error       |  24 +-
 14 files changed, 480 insertions(+), 127 deletions(-)

diff --git a/bin/pulsar-admin-common.sh b/bin/pulsar-admin-common.sh
index 670d2d9354f..fdfed60beda 100755
--- a/bin/pulsar-admin-common.sh
+++ b/bin/pulsar-admin-common.sh
@@ -56,18 +56,18 @@ fi
 add_maven_deps_to_classpath() {
     MVN="mvn"
     if [ "$MAVEN_HOME" != "" ]; then
-	MVN=${MAVEN_HOME}/bin/mvn
+      MVN=${MAVEN_HOME}/bin/mvn
     fi
 
     # Need to generate classpath from maven pom. This is costly so generate it
     # and cache it. Save the file into our target dir so a mvn clean will get
     # clean it up and force us create a new one.
-    f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
+    f="${PULSAR_HOME}/distribution/shell/target/classpath.txt"
     if [ ! -f "${f}" ]
     then
     (
       cd "${PULSAR_HOME}"
-      ${MVN} -pl distribution/server generate-sources &> /dev/null
+      ${MVN} -pl distribution/shell generate-sources &> /dev/null
     )
     fi
     PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
diff --git a/bin/pulsar-shell b/bin/pulsar-shell
index 66127265c48..f2aa9a56a20 100755
--- a/bin/pulsar-shell
+++ b/bin/pulsar-shell
@@ -37,4 +37,16 @@ OPTS="-Dorg.jline.terminal.jansi=false $OPTS"
 
 #Change to PULSAR_HOME to support relative paths
 cd "$PULSAR_HOME"
-exec $JAVA $OPTS org.apache.pulsar.shell.PulsarShell $PULSAR_CLIENT_CONF "$@"
+DEFAULT_SHELL_ARGS="--config $PULSAR_CLIENT_CONF"
+PASSED_SHELL_ARGS=""
+while [[ $# -gt 0 ]]
+do
+  key="$1"
+  if [[ "$key" == "-c" || "$key" == "--config" ]]; then
+    DEFAULT_SHELL_ARGS=""
+  fi
+  PASSED_SHELL_ARGS="$PASSED_SHELL_ARGS $key"
+  shift
+done
+
+exec $JAVA $OPTS org.apache.pulsar.shell.PulsarShell $DEFAULT_SHELL_ARGS $PASSED_SHELL_ARGS
\ No newline at end of file
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 7acd46fa8e7..898c579809a 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -350,6 +350,7 @@ The Apache Software License, Version 2.0
     - org.apache.commons-commons-collections4-4.4.jar
     - org.apache.commons-commons-compress-1.21.jar
     - org.apache.commons-commons-lang3-3.11.jar
+    - org.apache.commons-commons-text-1.9.jar
  * Netty
     - io.netty-netty-buffer-4.1.77.Final.jar
     - io.netty-netty-codec-4.1.77.Final.jar
diff --git a/distribution/shell/pom.xml b/distribution/shell/pom.xml
index cdf82c0a9be..450e7abb061 100644
--- a/distribution/shell/pom.xml
+++ b/distribution/shell/pom.xml
@@ -84,6 +84,23 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>build-classpath</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <outputFile>target/classpath.txt</outputFile>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/pom.xml b/pom.xml
index bca893dcf62..757cb034e10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@ flexible messaging model and an intuitive client API.</description>
     <bookkeeper.version>4.15.0</bookkeeper.version>
     <zookeeper.version>3.8.0</zookeeper.version>
     <commons-cli.version>1.5.0</commons-cli.version>
+    <commons-text.version>1.9</commons-text.version>
     <snappy.version>1.1.8.4</snappy.version> <!-- ZooKeeper server -->
     <dropwizardmetrics.version>4.1.12.1</dropwizardmetrics.version> <!-- ZooKeeper server -->
     <curator.version>5.1.0</curator.version>
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 31e73a43085..87dd76b48d4 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -118,6 +118,11 @@
       <artifactId>jline</artifactId>
       <version>${jline3.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-text</artifactId>
+      <version>${commons-text.version}</version>
+    </dependency>
 
   </dependencies>
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java
index 3d7186a2c8d..67eb0cc604e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java
@@ -50,6 +50,7 @@ public class AdminShell extends PulsarAdminTool implements ShellCommandsProvider
 
     @Override
     public void setupState(Properties properties) {
+        getJCommander().setProgramName(getName());
         setupCommands(b -> null);
     }
 
@@ -67,7 +68,7 @@ public class AdminShell extends PulsarAdminTool implements ShellCommandsProvider
 
 
     @Override
-    public void runCommand(String[] args) throws Exception {
-        run(args);
+    public boolean runCommand(String[] args) throws Exception {
+        return run(args);
     }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ClientShell.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ClientShell.java
index 60e2baaa1d1..71cce52408f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ClientShell.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ClientShell.java
@@ -50,6 +50,7 @@ public class ClientShell extends PulsarClientTool implements ShellCommandsProvid
 
     @Override
     public void setupState(Properties properties) {
+        getJCommander().setProgramName(getName());
     }
 
     @Override
@@ -65,7 +66,8 @@ public class ClientShell extends PulsarClientTool implements ShellCommandsProvid
     }
 
     @Override
-    public void runCommand(String[] args) throws Exception {
-        run(args);
+    public boolean runCommand(String[] args) throws Exception {
+        final int returnCode = run(args);
+        return returnCode == 0;
     }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/JCommanderCompleter.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/JCommanderCompleter.java
index 17cbb918fcd..d3aaf7297b0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/JCommanderCompleter.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/JCommanderCompleter.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.ParameterDescription;
 import com.beust.jcommander.WrappedParameter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,7 +47,7 @@ public class JCommanderCompleter {
         command.setProgramName(program);
         return createCompletersForCommand(Collections.emptyList(),
                 command,
-                List.of(NullCompleter.INSTANCE));
+                Arrays.asList(NullCompleter.INSTANCE));
     }
 
     private static List<Completer> createCompletersForCommand(List<Completer> preCompleters,
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java
index 068e10aaf73..2b72c0574eb 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java
@@ -20,7 +20,12 @@ package org.apache.pulsar.shell;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import java.io.BufferedReader;
 import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,15 +33,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringSubstitutor;
 import org.jline.reader.Completer;
 import org.jline.reader.LineReader;
 import org.jline.reader.LineReaderBuilder;
-import org.jline.reader.ParsedLine;
 import org.jline.reader.impl.completer.AggregateCompleter;
 import org.jline.terminal.Terminal;
 import org.jline.terminal.TerminalBuilder;
 import org.jline.utils.AttributedStringBuilder;
 import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
 
 /**
  * Main Pulsar shell class invokable from the pulsar-shell script.
@@ -46,30 +55,100 @@ public class PulsarShell {
     private static final String EXIT_MESSAGE = "Goodbye!";
     private static final String PROPERTY_PERSIST_HISTORY_ENABLED = "shellHistoryPersistEnabled";
     private static final String PROPERTY_PERSIST_HISTORY_PATH = "shellHistoryPersistPath";
+    private static final String CHECKMARK = new String(Character.toChars(0x2714));
+    private static final String XMARK = new String(Character.toChars(0x2716));
+    private static final AttributedStyle LOG_STYLE = AttributedStyle.DEFAULT
+            .foreground(25, 143, 255)
+            .background(230, 241, 255);
+
+    private static final Substitutor[] SUBSTITUTORS = {
+            (str, vars) -> new StringSubstitutor(vars, "${", "}", '\\').replace(str),
+            (str, vars) -> {
+                // unfortunately StringSubstitutor doesn't handle empty suffix regex
+                if (str.startsWith("\\$")) {
+                    return str.substring(1);
+                }
+                if (str.startsWith("$")) {
+                    final String key = str.substring(1);
+                    if (!vars.containsKey(key)) {
+                        return str;
+                    }
+                    return vars.get(key);
+                }
+                return str;
+            }
+    };
+
+    interface Substitutor {
+        String replace(String str, Map<String, String> vars);
+    }
+
+    static final class ShellOptions {
 
-    static final class MainOptions {
         @Parameter(names = {"-h", "--help"}, help = true, description = "Show this help.")
         boolean help;
     }
 
-    public static void main(String[] args) throws Exception {
-        if (args.length == 0) {
-            System.out.println("Usage: pulsar-shell CONF_FILE_PATH");
-            System.exit(0);
-            return;
-        }
+    static final class MainOptions {
+
+        @Parameter(names = {"-c", "--config"}, description = "Client configuration file.")
+        String configFile;
+
+        @Parameter(names = {"-f", "--filename"}, description = "Input filename with a list of commands to be executed."
+                + " Each command must be separated by a newline.")
+        String filename;
+
+        @Parameter(names = {"-e", "--exit-on-error"}, description = "If true, the shell will be interrupted "
+                + "if a command throws an exception.")
+        boolean exitOnError;
+
+        @Parameter(names = {"-"}, description = "Read commands from the standard input.")
+        boolean readFromStdin;
 
-        String configFile = args[0];
-        Properties properties = new Properties();
-        try (FileInputStream fis = new FileInputStream(configFile)) {
-            properties.load(fis);
+        @Parameter(names = {"-np", "--no-progress"}, description = "Display raw output of the commands without the "
+                + "fancy progress visualization.")
+        boolean noProgress;
+    }
+
+    private final Properties properties;
+    private final JCommander mainCommander;
+    private final MainOptions mainOptions;
+    private final String[] args;
+
+    public PulsarShell(String args[]) throws IOException {
+        this(args, new Properties());
+    }
+    public PulsarShell(String args[], Properties props) throws IOException {
+        properties = props;
+        mainCommander = new JCommander();
+        mainOptions = new MainOptions();
+        mainCommander.addObject(mainOptions);
+        try {
+            mainCommander.parse(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            System.err.println();
+            mainCommander.usage();
+            exit(1);
+            throw new IllegalArgumentException(e);
         }
-        new PulsarShell().run(properties);
+        if (mainOptions.configFile != null) {
+            String configFile = mainOptions.configFile;
+            try (FileInputStream fis = new FileInputStream(configFile)) {
+                properties.load(fis);
+            }
+        }
+        this.args = args;
+    }
+
+    public static void main(String[] args) throws Exception {
+        new PulsarShell(args).run();
     }
 
-    public void run(Properties properties) throws Exception {
+    public void run() throws Exception {
+        System.setProperty("org.jline.terminal.dumb", "true");
         final Terminal terminal = TerminalBuilder.builder().build();
-        run(properties, (providersMap) -> {
+        run((providersMap) -> {
             List<Completer> completers = new ArrayList<>();
             String serviceUrl = "";
             String adminUrl = "";
@@ -104,11 +183,25 @@ public class PulsarShell {
             LineReader reader = readerBuilder.build();
 
             final String welcomeMessage =
-                    String.format("Welcome to Pulsar shell!\n  Service URL: %s\n  Admin URL: %s\n\n "
+                    String.format("Welcome to Pulsar shell!\n  %s: %s\n  %s: %s\n\n "
                                     + "Type 'help' to get started or try the autocompletion (TAB button).\n",
-                            serviceUrl, adminUrl);
+                            new AttributedStringBuilder().style(AttributedStyle.BOLD).append("Service URL").toAnsi(),
+                            serviceUrl,
+                            new AttributedStringBuilder().style(AttributedStyle.BOLD).append("Admin URL").toAnsi(),
+                            adminUrl);
             output(welcomeMessage, terminal);
-            return reader;
+            final String prompt = createPrompt(getHostFromUrl(serviceUrl));
+            return new InteractiveLineReader() {
+                @Override
+                public String readLine() {
+                    return reader.readLine(prompt);
+                }
+
+                @Override
+                public List<String> parseLine(String line) {
+                    return reader.getParser().parse(line, 0).words();
+                }
+            };
         }, (providerMap) -> terminal);
     }
 
@@ -124,57 +217,187 @@ public class PulsarShell {
         }
     }
 
-    public void run(Properties properties,
-                    Function<Map<String, ShellCommandsProvider>, LineReader> readerBuilder,
-                    Function<Map<String, ShellCommandsProvider>, Terminal> terminalBuilder) throws Exception {
-        System.setProperty("org.jline.terminal.dumb", "true");
+    private interface CommandReader {
+        List<String> readCommand() throws InterruptShellException;
+    }
 
-        final JCommander mainCommander = new JCommander();
-        final MainOptions mainOptions = new MainOptions();
-        mainCommander.addObject(mainOptions);
+    private static class InterruptShellException extends RuntimeException {
+    }
+
+    private static class CommandsInfo {
+
+        @AllArgsConstructor
+        static class ExecutedCommandInfo {
+            String command;
+            boolean ok;
+        }
+        int totalCommands;
+        List<ExecutedCommandInfo> executedCommands = new ArrayList<>();
+        String executingCommand;
+    }
+
+    interface InteractiveLineReader {
+
+        String readLine();
 
-        final Map<String, ShellCommandsProvider> providersMap = registerProviders(mainCommander, properties);
+        List<String> parseLine(String line);
+    }
+
+    public void run(Function<Map<String, ShellCommandsProvider>, InteractiveLineReader> readerBuilder,
+                    Function<Map<String, ShellCommandsProvider>, Terminal> terminalBuilder) throws Exception {
+        /**
+         * Options read from the shell session
+         */
+        final JCommander shellCommander = new JCommander();
+        final ShellOptions shellOptions = new ShellOptions();
+        shellCommander.addObject(shellOptions);
 
-        final LineReader reader = readerBuilder.apply(providersMap);
+        final Map<String, ShellCommandsProvider> providersMap = registerProviders(shellCommander, properties);
+
+        final InteractiveLineReader reader = readerBuilder.apply(providersMap);
         final Terminal terminal = terminalBuilder.apply(providersMap);
-        final String prompt = createPrompt();
+        final Map<String, String> variables = System.getenv();
+
+        CommandReader commandReader;
+        CommandsInfo commandsInfo = null;
+
+        if (mainOptions.readFromStdin && mainOptions.filename != null) {
+            throw new IllegalArgumentException("Cannot use stdin and -f/--filename option at same time");
+        }
+        boolean isNonInteractiveMode = mainOptions.filename != null || mainOptions.readFromStdin;
+
+        if (isNonInteractiveMode) {
+            final List<String> lines;
+            if (mainOptions.filename != null) {
+                lines = Files.readAllLines(Paths.get(mainOptions.filename))
+                        .stream()
+                        .filter(PulsarShell::filterLine)
+                        .collect(Collectors.toList());
+            } else {
+                try (BufferedReader stdinReader = new BufferedReader(new InputStreamReader(System.in))) {
+                    lines = stdinReader.lines().filter(PulsarShell::filterLine).collect(Collectors.toList());
+                }
+            }
+            if (!mainOptions.noProgress) {
+                commandsInfo = new CommandsInfo();
+                commandsInfo.totalCommands = lines.size();
+            }
+
+            final CommandsInfo finalCommandsInfo = commandsInfo;
+            commandReader = new CommandReader() {
+                private int index = 0;
+
+                @Override
+                public List<String> readCommand() {
+                    if (index == lines.size()) {
+                        throw new InterruptShellException();
+                    }
+                    String command = lines.get(index++).trim();
+                    final List<String> words = substituteVariables(reader.parseLine(command), variables);
+                    command = words.stream().collect(Collectors.joining(" "));
+                    if (finalCommandsInfo != null) {
+                        finalCommandsInfo.executingCommand = command;
+                    } else {
+                        output(String.format("[%d/%d] Executing %s", new Object[]{index,
+                                lines.size(), command}), terminal);
+                    }
+                    return words;
+                }
+            };
+        } else {
+            commandReader = () -> {
+                try {
+                    final String line = reader.readLine().trim();
+                    return substituteVariables(reader.parseLine(line), variables);
+                } catch (org.jline.reader.UserInterruptException userInterruptException) {
+                    throw new InterruptShellException();
+                }
+            };
+        }
+
         Runtime.getRuntime().addShutdownHook(new Thread(() -> quit(terminal)));
         while (true) {
-            String line;
+            final List<String> words;
             try {
-                line = reader.readLine(prompt).trim();
-            } catch (org.jline.reader.UserInterruptException userInterruptException) {
-                break;
+                words = commandReader.readCommand();
+            } catch (InterruptShellException interruptShellException) {
+                exit(0);
+                return;
             }
-            if (line.isBlank()) {
+            final String line = words.stream().collect(Collectors.joining(" "));
+            if (StringUtils.isBlank(line)) {
                 continue;
             }
             if (isQuitCommand(line)) {
-                break;
+                exit(0);
+                return;
             }
-            final List<String> words = parseLine(reader, line);
-
-            if (mainOptions.help) {
-                mainCommander.usage();
+            if (shellOptions.help) {
+                shellCommander.usage();
                 continue;
             }
 
-            final ShellCommandsProvider pulsarShellCommandsProvider = getProviderFromArgs(mainCommander, words);
+            final ShellCommandsProvider pulsarShellCommandsProvider = getProviderFromArgs(shellCommander, words);
             if (pulsarShellCommandsProvider == null) {
-                mainCommander.usage();
+                shellCommander.usage();
                 continue;
             }
             String[] argv = extractAndConvertArgs(words);
+            boolean commandOk = false;
             try {
-                pulsarShellCommandsProvider.runCommand(argv);
+                printExecutingCommands(terminal, commandsInfo, false);
+                commandOk = pulsarShellCommandsProvider.runCommand(argv);
             } catch (Throwable t) {
                 t.printStackTrace(terminal.writer());
             } finally {
+                final boolean willExitWithError = mainOptions.exitOnError && !commandOk;
+                if (commandsInfo != null && !willExitWithError) {
+                    commandsInfo.executingCommand = null;
+                    commandsInfo.executedCommands.add(new CommandsInfo.ExecutedCommandInfo(line, commandOk));
+                    printExecutingCommands(terminal, commandsInfo, true);
+                }
                 pulsarShellCommandsProvider.cleanupState(properties);
+
+            }
+            if (mainOptions.exitOnError && !commandOk) {
+                exit(1);
+                return;
             }
         }
     }
 
+    private void printExecutingCommands(Terminal terminal,
+                                        CommandsInfo commandsInfo,
+                                        boolean printExecuted) {
+        if (commandsInfo == null) {
+            return;
+        }
+        terminal.puts(InfoCmp.Capability.clear_screen);
+        terminal.flush();
+        int index = 1;
+        if (printExecuted) {
+            for (CommandsInfo.ExecutedCommandInfo executedCommand : commandsInfo.executedCommands) {
+                String icon = executedCommand.ok ? CHECKMARK : XMARK;
+                final String ansiLog = new AttributedStringBuilder()
+                        .style(LOG_STYLE)
+                        .append(String.format("[%d/%d] %s %s", new Object[]{index++, commandsInfo.totalCommands,
+                                icon, executedCommand.command}))
+                        .toAnsi();
+                output(ansiLog, terminal);
+            }
+        } else {
+            index = commandsInfo.executedCommands.size() + 1;
+        }
+        if (commandsInfo.executingCommand != null) {
+            final String ansiLog = new AttributedStringBuilder()
+                    .style(LOG_STYLE)
+                    .append(String.format("[%d/%d] Executing %s", new Object[]{index,
+                            commandsInfo.totalCommands, commandsInfo.executingCommand}))
+                    .toAnsi();
+            output(ansiLog, terminal);
+        }
+    }
+
     private static ShellCommandsProvider getProviderFromArgs(JCommander mainCommander, List<String> words) {
         final String providerCmd = words.get(0);
         final JCommander commander = mainCommander.getCommands().get(providerCmd);
@@ -184,19 +407,25 @@ public class PulsarShell {
         return (ShellCommandsProvider) commander.getObjects().get(0);
     }
 
-    private static String createPrompt() {
+    private static String createPrompt(String hostname) {
+        final String string = (hostname == null ? "pulsar" : hostname) + ">";
         return new AttributedStringBuilder()
-                .style(AttributedStyle.DEFAULT.foreground(25, 143, 255).background(230, 241, 255))
-                .append("pulsar>")
+                .style(LOG_STYLE)
+                .append(string)
                 .style(AttributedStyle.DEFAULT)
                 .append(" ")
                 .toAnsi();
     }
 
-    private static List<String> parseLine(LineReader reader, String line) {
-        final ParsedLine pl = reader.getParser().parse(line, 0);
-        final List<String> words = pl.words();
-        return words;
+    static List<String> substituteVariables(List<String> line, Map<String, String> vars) {
+        return line.stream().map(s -> PulsarShell.substituteVariables(s, vars)).collect(Collectors.toList());
+    }
+
+    private static String substituteVariables(String string, Map<String, String> vars) {
+        for (Substitutor stringSubstitutor : SUBSTITUTORS) {
+            string = stringSubstitutor.replace(string, vars);
+        }
+        return string;
     }
 
     private static void quit(Terminal terminal) {
@@ -253,4 +482,23 @@ public class PulsarShell {
         providerMap.put(name, provider);
     }
 
+    protected void exit(int exitCode) {
+        System.exit(exitCode);
+    }
+
+    private static boolean filterLine(String line) {
+        return !StringUtils.isBlank(line) && !line.startsWith("#");
+    }
+
+    private static String getHostFromUrl(String url) {
+        if (url == null) {
+            return null;
+        }
+        try {
+            return URI.create(url).getHost();
+        } catch (IllegalArgumentException iea) {
+            return null;
+        }
+    }
+
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ShellCommandsProvider.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ShellCommandsProvider.java
index e00a8faeba2..a99df1ceaa4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ShellCommandsProvider.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/ShellCommandsProvider.java
@@ -74,5 +74,5 @@ public interface ShellCommandsProvider {
      * @param args arguments for the command. Note that the first word of the user command is omitted.
      * @throws Exception if any error occurs. The shell session will not be closed.
      */
-    void runCommand(String[] args) throws Exception;
+    boolean runCommand(String[] args) throws Exception;
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java
index a5e622dcbe0..0a6f4fcdf41 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java
@@ -21,11 +21,15 @@ package org.apache.pulsar.shell;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -47,8 +51,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-
-
 public class PulsarShellTest {
 
     private static final Logger log = LoggerFactory.getLogger(PulsarShellTest.class);
@@ -58,7 +60,7 @@ public class PulsarShellTest {
 
     private Topics topics;
 
-    static class MockLineReader extends LineReaderImpl {
+    static class MockLineReader extends LineReaderImpl implements PulsarShell.InteractiveLineReader {
 
         private BlockingQueue<String> commandsQueue = new LinkedBlockingQueue<>();
 
@@ -72,12 +74,64 @@ public class PulsarShellTest {
 
         @Override
         @SneakyThrows
-        public String readLine(String prompt) throws UserInterruptException, EndOfFileException {
+        public String readLine() throws UserInterruptException, EndOfFileException {
             final String cmd = commandsQueue.take();
             log.info("writing command: {}", cmd);
             return cmd;
 
         }
+
+        @Override
+        public List<String> parseLine(String line) {
+            return getParser().parse(line, 0).words();
+        }
+    }
+
+    private static class TestPulsarShell extends PulsarShell {
+
+        private final PulsarAdminBuilder pulsarAdminBuilder;
+        AtomicReference<CmdProduce> cmdProduceHolder = new AtomicReference<>();
+        Integer exitCode;
+
+        public TestPulsarShell(String[] args, Properties props, PulsarAdminBuilder pulsarAdminBuilder) throws IOException {
+            super(args, props);
+            this.pulsarAdminBuilder = pulsarAdminBuilder;
+        }
+
+        @Override
+        protected AdminShell createAdminShell(Properties properties) throws Exception {
+            return new AdminShell(properties) {
+                @Override
+                protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
+                    return pulsarAdminBuilder;
+                }
+            };
+        }
+
+        @Override
+        protected ClientShell createClientShell(Properties properties) {
+            final ClientShell clientShell = new ClientShell(properties);
+            final CmdProduce cmdProduce = mock(CmdProduce.class);
+            cmdProduceHolder.set(cmdProduce);
+            Whitebox.setInternalState(clientShell, "produceCommand", cmdProduceHolder.get());
+            return clientShell;
+        }
+
+        @Override
+        protected void exit(int exitCode) {
+            this.exitCode = exitCode;
+            if (exitCode != 0) {
+                throw new SystemExitCalledException(exitCode);
+            }
+        }
+    }
+
+    private static class SystemExitCalledException extends RuntimeException {
+        private int code;
+
+        public SystemExitCalledException(int code) {
+            this.code = code;
+        }
     }
 
     @BeforeMethod(alwaysRun = true)
@@ -91,8 +145,7 @@ public class PulsarShellTest {
 
 
     @Test
-    public void mainTest() throws Exception{
-        AtomicReference<CmdProduce> cmdProduceHolder = new AtomicReference<>();
+    public void testInteractiveMode() throws Exception{
         Terminal terminal = TerminalBuilder.builder().build();
         final MockLineReader linereader = new MockLineReader(terminal);
 
@@ -101,30 +154,78 @@ public class PulsarShellTest {
         linereader.addCmd("admin topics create my-topic --metadata a=b ");
         linereader.addCmd("client produce -m msg my-topic");
         linereader.addCmd("quit");
-        new PulsarShell(){
-            @Override
-            protected AdminShell createAdminShell(Properties properties) throws Exception {
-                return new AdminShell(properties) {
-                    @Override
-                    protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
-                        return pulsarAdminBuilder;
-                    }
-                };
-            }
+        final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{}, props, pulsarAdminBuilder);
+        testPulsarShell.run((a) -> linereader, (a) -> terminal);
+        verify(topics).createNonPartitionedTopic(eq("persistent://public/default/my-topic"), any(Map.class));
+        verify(testPulsarShell.cmdProduceHolder.get()).run();
+        assertEquals((int) testPulsarShell.exitCode, 0);
 
-            @Override
-            protected ClientShell createClientShell(Properties properties) {
-                final ClientShell clientShell = new ClientShell(properties);
-                final Object current = Whitebox.getInternalState(clientShell, "produceCommand");
-                cmdProduceHolder.set(spy((CmdProduce) current));
-                Whitebox.setInternalState(clientShell, "produceCommand", cmdProduceHolder.get());
-                return clientShell;
-            }
+    }
 
-        }.run(props, (a) -> linereader, (a) -> terminal);
+    @Test
+    public void testFileMode() throws Exception{
+        Terminal terminal = TerminalBuilder.builder().build();
+        final MockLineReader linereader = new MockLineReader(terminal);
+        final Properties props = new Properties();
+        props.setProperty("webServiceUrl", "http://localhost:8080");
+
+        final String shellFile = Thread.currentThread()
+                .getContextClassLoader().getResource("test-shell-file").getFile();
+
+        final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{"-f", shellFile},
+                props, pulsarAdminBuilder);
+        testPulsarShell.run((a) -> linereader, (a) -> terminal);
         verify(topics).createNonPartitionedTopic(eq("persistent://public/default/my-topic"), any(Map.class));
-        verify(cmdProduceHolder.get()).run();
+        verify(testPulsarShell.cmdProduceHolder.get()).run();
+    }
 
+    @Test
+    public void testFileModeExitOnError() throws Exception {
+        Terminal terminal = TerminalBuilder.builder().build();
+        final MockLineReader linereader = new MockLineReader(terminal);
+        final Properties props = new Properties();
+        props.setProperty("webServiceUrl", "http://localhost:8080");
+
+        final String shellFile = Thread.currentThread()
+                .getContextClassLoader().getResource("test-shell-file-error").getFile();
+
+        final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{"-f", shellFile, "-e"},
+                props, pulsarAdminBuilder);
+        try {
+            testPulsarShell.run((a) -> linereader, (a) -> terminal);
+            fail();
+        }  catch (SystemExitCalledException ex) {
+            assertEquals(ex.code, 1);
+        }
+
+        verify(topics).createNonPartitionedTopic(eq("persistent://public/default/my-topic"), any(Map.class));
+        verify(testPulsarShell.cmdProduceHolder.get(), times(0)).run();
     }
 
+    @Test
+    public void testSubstituteVariables() throws Exception {
+        Map<String, String> vars = new HashMap<>();
+        vars.put("mytopic", "the-topic");
+        assertEquals(
+                PulsarShell.substituteVariables(Arrays.asList("admin", "topics", "create", "${mytopic}"), vars),
+                Arrays.asList("admin", "topics", "create", "the-topic")
+        );
+        assertEquals(
+                PulsarShell.substituteVariables(Arrays.asList("admin", "topics", "create", "\\${mytopic}"), vars),
+                Arrays.asList("admin", "topics", "create", "${mytopic}")
+        );
+        assertEquals(
+                PulsarShell.substituteVariables(Arrays.asList("admin", "topics", "create", "${MYTOPIC}"), vars),
+                Arrays.asList("admin", "topics", "create", "${MYTOPIC}")
+        );
+        assertEquals(
+                PulsarShell.substituteVariables(Arrays.asList("admin", "topics", "create", "$mytopic"), vars),
+                Arrays.asList("admin", "topics", "create", "the-topic")
+        );
+        assertEquals(
+                PulsarShell.substituteVariables(Arrays.asList("admin", "topics", "create", "\\$mytopic"), vars),
+                Arrays.asList("admin", "topics", "create", "$mytopic")
+        );
+
+    }
 }
\ No newline at end of file
diff --git a/bin/pulsar-shell b/pulsar-client-tools/src/test/resources/test-shell-file
old mode 100755
new mode 100644
similarity index 58%
copy from bin/pulsar-shell
copy to pulsar-client-tools/src/test/resources/test-shell-file
index 66127265c48..b4dbf8ec514
--- a/bin/pulsar-shell
+++ b/pulsar-client-tools/src/test/resources/test-shell-file
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,23 +17,6 @@
 # under the License.
 #
 
-# need this for relative symlinks
-PRG="$0"
-while [ -h "$PRG" ] ; do
-  ls=`ls -ld "$PRG"`
-  link=`expr "$ls" : '.*-> \(.*\)$'`
-  if expr "$link" : '/.*' > /dev/null; then
-    PRG="$link"
-  else
-    PRG="`dirname "$PRG"`/$link"
-  fi
-done
+admin topics create my-topic --metadata a=b
 
-BINDIR=$(dirname "$PRG")
-export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
-. "$PULSAR_HOME/bin/pulsar-admin-common.sh"
-OPTS="-Dorg.jline.terminal.jansi=false $OPTS"
-
-#Change to PULSAR_HOME to support relative paths
-cd "$PULSAR_HOME"
-exec $JAVA $OPTS org.apache.pulsar.shell.PulsarShell $PULSAR_CLIENT_CONF "$@"
+client produce -m msg my-topic
\ No newline at end of file
diff --git a/bin/pulsar-shell b/pulsar-client-tools/src/test/resources/test-shell-file-error
old mode 100755
new mode 100644
similarity index 58%
copy from bin/pulsar-shell
copy to pulsar-client-tools/src/test/resources/test-shell-file-error
index 66127265c48..6adf8288800
--- a/bin/pulsar-shell
+++ b/pulsar-client-tools/src/test/resources/test-shell-file-error
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,23 +17,6 @@
 # under the License.
 #
 
-# need this for relative symlinks
-PRG="$0"
-while [ -h "$PRG" ] ; do
-  ls=`ls -ld "$PRG"`
-  link=`expr "$ls" : '.*-> \(.*\)$'`
-  if expr "$link" : '/.*' > /dev/null; then
-    PRG="$link"
-  else
-    PRG="`dirname "$PRG"`/$link"
-  fi
-done
-
-BINDIR=$(dirname "$PRG")
-export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
-. "$PULSAR_HOME/bin/pulsar-admin-common.sh"
-OPTS="-Dorg.jline.terminal.jansi=false $OPTS"
-
-#Change to PULSAR_HOME to support relative paths
-cd "$PULSAR_HOME"
-exec $JAVA $OPTS org.apache.pulsar.shell.PulsarShell $PULSAR_CLIENT_CONF "$@"
+admin topics create my-topic --metadata a=b
+admin notexist-command
+client produce -m msg my-topic
\ No newline at end of file