You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/22 23:48:33 UTC

[2/3] samza git commit: SAMZA-1901: Implementation of Samza SQL Shell

SAMZA-1901: Implementation of Samza SQL Shell

## What changes were proposed in this pull request?
This PR is to implement Samza SQL shell. The document about the shell was attached [here](https://issues.apache.org/jira/browse/SAMZA-1901).

## How was this patch tested?
1. Add unit tests
2. Run the shell with use cases mentioned in the attached document under https://issues.apache.org/jira/browse/SAMZA-1901

Author: Weiqing Yang <ya...@gmail.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>

Closes #654 from weiqingy/samza-shell


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2782818e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2782818e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2782818e

Branch: refs/heads/1.0.0
Commit: 2782818eb99231a3e67764a569fe93139494d8fc
Parents: ba90a51
Author: Weiqing Yang <ya...@gmail.com>
Authored: Mon Oct 22 09:51:40 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 22 16:47:47 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |  28 +
 gradle/dependency-versions.gradle               |   2 +
 .../JobNodeConfigurationGenerator.java          |   6 +-
 samza-sql-shell/conf/samza-sql-shell-log4j.xml  |  49 ++
 samza-sql-shell/conf/shell-defaults.conf        |  28 +
 samza-sql-shell/scripts/samza-sql-shell.sh      |  42 +
 .../apache/samza/sql/client/cli/CliCommand.java |  53 ++
 .../samza/sql/client/cli/CliCommandType.java    |  78 ++
 .../samza/sql/client/cli/CliConstants.java      |  57 ++
 .../samza/sql/client/cli/CliEnvironment.java    | 130 +++
 .../samza/sql/client/cli/CliHighlighter.java    |  89 ++
 .../apache/samza/sql/client/cli/CliShell.java   | 821 +++++++++++++++++++
 .../apache/samza/sql/client/cli/CliView.java    |  29 +
 .../org/apache/samza/sql/client/cli/Main.java   | 117 +++
 .../sql/client/cli/QueryResultLogView.java      | 291 +++++++
 .../sql/client/impl/AvroSqlSchemaConverter.java | 112 +++
 .../client/impl/CliLoggingSystemFactory.java    | 117 +++
 .../FileSystemAvroRelSchemaProviderFactory.java |  75 ++
 .../samza/sql/client/impl/SamzaExecutor.java    | 511 ++++++++++++
 .../sql/client/impl/SamzaSqlFieldType.java      |  94 +++
 .../sql/client/impl/SamzaSqlUdfDisplayInfo.java |  71 ++
 .../sql/client/interfaces/ExecutionContext.java |  41 +
 .../client/interfaces/ExecutionException.java   |  40 +
 .../sql/client/interfaces/ExecutionStatus.java  |  30 +
 .../sql/client/interfaces/NonQueryResult.java   |  62 ++
 .../sql/client/interfaces/QueryResult.java      |  49 ++
 .../sql/client/interfaces/SqlExecutor.java      | 171 ++++
 .../sql/client/interfaces/SqlFunction.java      |  55 ++
 .../samza/sql/client/interfaces/SqlSchema.java  |  56 ++
 .../sql/client/interfaces/SqlSchemaBuilder.java |  63 ++
 .../samza/sql/client/util/CliException.java     |  41 +
 .../apache/samza/sql/client/util/CliUtil.java   |  43 +
 .../sql/client/util/RandomAccessQueue.java      |  96 +++
 .../sql/client/impl/SamzaExecutorTest.java      |  79 ++
 .../sql/client/util/RandomAccessQueueTest.java  |  89 ++
 .../src/test/resources/ProfileChangeStream.avsc |  51 ++
 .../sql/runner/SamzaSqlApplicationConfig.java   |   2 +-
 settings.gradle                                 |   1 +
 38 files changed, 3764 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 856bb1d..b6ee4ac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -336,6 +336,34 @@ project(':samza-sql') {
   }
 }
 
+project(':samza-sql-shell') {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(':samza-sql')
+    compile project(':samza-tools')
+    compile project(":samza-core_$scalaVersion")
+    compile project(':samza-api')
+    compile project(":samza-kafka_$scalaVersion")
+    compile project(':samza-azure')
+    compile "net.java.dev.jna:jna:$jnaVersion"
+    compile "org.jline:jline:$jlineVersion"
+
+    testCompile "junit:junit:$junitVersion"
+  }
+
+  tasks.create(name: "releaseSqlShellTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
+    into "samza-sql-shell-${version}"
+    compression = Compression.GZIP
+    from(project.file("./scripts")) { into "scripts/" }
+    from(project.file("./conf")) { into "conf/" }
+    from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "scripts/" }
+    from(configurations.runtime) { into("lib/") }
+    from(configurations.archives.artifacts.files) { into("lib/") }
+    duplicatesStrategy 'exclude'
+  }
+}
+
 project(':samza-tools') {
   apply plugin: 'java'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a5b4f51..43ceb0a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -47,4 +47,6 @@
   zkClientVersion = "0.8"
   zookeeperVersion = "3.4.6"
   failsafeVersion = "1.1.0"
+  jlineVersion = "3.8.2"
+  jnaVersion = "4.5.1"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 70c9b23..215177b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -70,10 +70,8 @@ import org.slf4j.LoggerFactory;
   static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
     Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
     originalConfig.forEach((k, v) -> {
-        if (generatedConfig.containsKey(k) &&
-            !Objects.equals(generatedConfig.get(k), v)) {
-          LOG.info("Replacing generated config for key: {} value: {} with original config value: {}",
-              k, generatedConfig.get(k), v);
+        if (generatedConfig.containsKey(k) && !Objects.equals(generatedConfig.get(k), v)) {
+          LOG.info("Replacing generated config for key: {} value: {} with original config value: {}", k, generatedConfig.get(k), v);
         }
         mergedConfig.put(k, v);
       });

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/conf/samza-sql-shell-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/samza-sql-shell-log4j.xml b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
new file mode 100644
index 0000000..924ef93
--- /dev/null
+++ b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+    </layout>
+  </appender>
+
+  <appender name="file" class="org.apache.log4j.RollingFileAppender">
+    <param name="append" value="false" />
+    <param name="maxFileSize" value="10MB" />
+    <param name="maxBackupIndex" value="10" />
+    <param name="file" value="${LOG_HOME}/logs/samza-sql-shell.log" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+    </layout>
+  </appender>
+
+  <root>
+    <level value="info" />
+    <appender-ref ref="file" />
+  </root>
+</log4j:configuration>
+

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/conf/shell-defaults.conf
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/shell-defaults.conf b/samza-sql-shell/conf/shell-defaults.conf
new file mode 100644
index 0000000..0c85e52
--- /dev/null
+++ b/samza-sql-shell/conf/shell-defaults.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Default system properties included when running samza-sql-shell.
+# This is useful for setting default environmental settings.
+
+# Example:
+shell.executor = org.apache.samza.sql.client.impl.SamzaExecutor
+samza.sql.output = compact
+# samza.sql.system.kafka.address = localhost:2181
+# samza.sql.relSchemaProvider.config.schemaDir = /tmp/schemas/
+# samza.sql.ioResolver = config
+# samza.sql.udfResolver = config

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/scripts/samza-sql-shell.sh
----------------------------------------------------------------------
diff --git a/samza-sql-shell/scripts/samza-sql-shell.sh b/samza-sql-shell/scripts/samza-sql-shell.sh
new file mode 100755
index 0000000..d9eb6df
--- /dev/null
+++ b/samza-sql-shell/scripts/samza-sql-shell.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+parent_dir="$(dirname "$base_dir")"
+
+CONF_FILE="-conf $parent_dir/conf/shell-defaults.conf"
+
+if [ "x$LOG_HOME" = "x" ]; then
+    export LOG_HOME=$parent_dir
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$parent_dir/conf/samza-sql-shell-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx4G -Xms4G"
+fi
+
+exec $base_dir/run-class.sh $LOG4J_OPTS -DLOG_HOME=$LOG_HOME org.apache.samza.sql.client.cli.Main $CONF_FILE "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
new file mode 100755
index 0000000..7c6480b
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * A shell command containing command name and parameters.
+ */
+class CliCommand {
+  private CliCommandType commandType;
+  private String parameters;
+
+  public CliCommand(CliCommandType cmdType) {
+    this.commandType = cmdType;
+  }
+
+  public CliCommand(CliCommandType cmdType, String parameters) {
+    this(cmdType);
+    this.parameters = parameters;
+  }
+
+  public CliCommandType getCommandType() {
+    return commandType;
+  }
+
+  public String getParameters() {
+    return parameters;
+  }
+
+  public void setParameters(String parameters) {
+    this.parameters = parameters;
+  }
+
+  public String getFullCommand() {
+    return commandType.getCommandName() + CliConstants.SPACE + parameters;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
new file mode 100755
index 0000000..0cd170e
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Enum all the commands we now support along with descriptions.
+ */
+enum CliCommandType {
+  SHOW_TABLES("SHOW TABLES", "Shows all available tables.", "Usage: SHOW TABLES <table name>"),
+  SHOW_FUNCTIONS("SHOW FUNCTIONS", "Shows all available UDFs.", "SHOW FUNCTION"),
+  DESCRIBE("DESCRIBE", "Describes a table.", "Usage: DESCRIBE <table name>"),
+
+  SELECT("SELECT", "\tExecutes a SQL SELECT query.", "SELECT uses a standard streaming SQL syntax."),
+  EXECUTE("EXECUTE", "\tExecute a sql file.", "EXECUTE <URI of a sql file>"),
+  INSERT_INTO("INSERT INTO", "Executes a SQL INSERT INTO.", "INSERT INTO uses a standard streaming SQL syntax."),
+  LS("LS", "\tLists all background executions.", "LS [execution ID]"),
+  STOP("STOP", "\tStops an execution.", "Usage: STOP <execution ID>"),
+  RM("RM", "\tRemoves an execution from the list.", "Usage: RM <execution ID>"),
+
+  HELP("HELP", "\tDisplays this help message.", "Usage: HELP [command]"),
+  SET("SET", "\tSets a variable.", "Usage: SET VAR=VAL"),
+  CLEAR("CLEAR", "\tClears the screen.", "CLEAR"),
+  EXIT("EXIT", "\tExits the shell.", "Exit"),
+  QUIT("QUIT", "\tQuits the shell.", "QUIT"),
+
+  INVALID_COMMAND("INVALID_COMMAND", "INVALID_COMMAND", "INVALID_COMMAND");
+
+  private final String cmdName;
+  private final String description;
+  private final String usage;
+
+  CliCommandType(String cmdName, String description, String usage) {
+    this.cmdName = cmdName;
+    this.description = description;
+    this.usage = usage;
+  }
+
+  public static List<String> getAllCommands() {
+    List<String> cmds = new ArrayList<String>();
+    for (CliCommandType t : CliCommandType.values()) {
+      if (t != INVALID_COMMAND)
+        cmds.add(t.getCommandName());
+    }
+    return cmds;
+  }
+
+  public String getCommandName() {
+    return cmdName;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public String getUsage() {
+    return usage;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
new file mode 100755
index 0000000..286dc79
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * Constant definitions for the shell.
+ */
+class CliConstants {
+  public static final String APP_NAME = "Samza SQL Shell";
+  public static final String WINDOW_TITLE = "Samza SQL Shell";
+  public static final String PROMPT_1ST = "Samza SQL";
+  public static final String PROMPT_1ST_END = "> ";
+
+  // All shell environment variables starts with the prefix
+  public static final String CONFIG_SHELL_PREFIX = "shell.";
+  // Specifies the executor used by the shell
+  public static final String CONFIG_EXECUTOR = "shell.executor";
+
+  public static final String VERSION = "0.0.1";
+
+
+  public static final String WELCOME_MESSAGE;
+  static {
+        WELCOME_MESSAGE =
+"      ___           ___           ___           ___           ___ \n" +
+"     /  /\\         /  /\\         /  /\\         /__/\\         /  /\\ \n" +
+"    /  /::\\       /  /::\\       /  /::|        \\  \\:\\       /  /::\\ \n"+
+"   /__/:/\\:\\     /  /:/\\:\\     /  /:|:|         \\  \\:\\     /  /:/\\:\\ \n"+
+"  _\\_ \\:\\ \\:\\   /  /::\\ \\:\\   /  /:/|:|__        \\  \\:\\   /  /::\\ \\:\\ \n"+
+" /__/\\ \\:\\ \\:\\ /__/:/\\:\\_\\:\\ /__/:/_|::::\\  ______\\__\\:\\ /__/:/\\:\\_\\:\\ \n"+
+" \\  \\:\\ \\:\\_\\/ \\__\\/  \\:\\/:/ \\__\\/  /~~/:/ \\  \\::::::::/ \\__\\/  \\:\\/:/ \n"+
+"  \\  \\:\\_\\:\\        \\__\\::/        /  /:/   \\  \\:\\~~~~~       \\__\\::/ \n"+
+"   \\  \\:\\/:/        /  /:/        /  /:/     \\  \\:\\           /  /:/ \n"+
+"    \\  \\::/        /__/:/        /__/:/       \\  \\:\\         /__/:/ \n"+
+"     \\__\\/         \\__\\/         \\__\\/         \\__\\/         \\__\\/  \n\n"+
+"Welcome to Samza SQL shell (V" + VERSION + "). Enter HELP for all commands.\n\n";
+  }
+
+  public static final char SPACE = '\u0020';
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
new file mode 100644
index 0000000..9384169
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CliEnvironment contains "environment variables" that configures the shell behavior.
+ */
+public class CliEnvironment {
+  private static final String debugEnvVar = "shell.debug";
+  private static PrintStream stdout = System.out;
+  private static PrintStream stderr = System.err;
+  private Boolean debug = false;
+
+  boolean isDebug() {
+    return debug;
+  }
+
+  void setDebug(Boolean debug) {
+    this.debug = debug;
+  }
+
+  /**
+   * @param var Environment variable
+   * @param val Value of the environment variable
+   * @return 0 : succeed
+   * -1: invalid var
+   * -2: invalid val
+   */
+  int setEnvironmentVariable(String var, String val) {
+    switch (var.toLowerCase()) {
+      case debugEnvVar:
+        val = val.toLowerCase();
+        if (val.equals("true")) {
+          debug = true;
+          enableJavaSystemOutAndErr();
+        } else if (val.equals("false")) {
+          debug = false;
+          disableJavaSystemOutAndErr();
+        } else
+          return -2;
+        break;
+      default:
+        return -1;
+    }
+
+    return 0;
+  }
+
+  // TODO: Separate the values out of the logic part
+  List<String> getPossibleValues(String var) {
+    List<String> vals = new ArrayList<>();
+    switch (var.toLowerCase()) {
+      case debugEnvVar:
+        vals.add("true");
+        vals.add("false");
+        return vals;
+      default:
+        return null;
+    }
+  }
+
+  void printAll(Writer writer) throws IOException {
+    writer.write(debugEnvVar);
+    writer.write('=');
+    writer.write(debug.toString());
+    writer.write('\n');
+  }
+
+  private void disableJavaSystemOutAndErr() {
+    PrintStream ps = new PrintStream(new NullOutputStream());
+    System.setOut(ps);
+    System.setErr(ps);
+  }
+
+  private void enableJavaSystemOutAndErr() {
+    System.setOut(stdout);
+    System.setErr(stderr);
+  }
+
+  void takeEffect() {
+    if (debug) {
+      enableJavaSystemOutAndErr();
+    } else {
+      // We control terminal directly; Forbid any Java System.out and System.err stuff so
+      // any underlying output will not mess up the console
+      disableJavaSystemOutAndErr();
+    }
+  }
+
+  private class NullOutputStream extends OutputStream {
+    public void close() {
+    }
+
+    public void flush() {
+    }
+
+    public void write(byte[] b) {
+    }
+
+    public void write(byte[] b, int off, int len) {
+    }
+
+    public void write(int b) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
new file mode 100755
index 0000000..c01ed5c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.Highlighter;
+import org.jline.reader.LineReader;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A primitive highlighter.
+ */
+public class CliHighlighter implements Highlighter {
+  private static final List<String> keywords;
+
+  static {
+    keywords = CliCommandType.getAllCommands();
+    keywords.add("FROM");
+    keywords.add("WHERE");
+  }
+
+  private static List<String> splitWithSpace(String buffer) {
+    List<String> list = new ArrayList<String>();
+    if (CliUtil.isNullOrEmpty(buffer))
+      return list;
+
+    boolean prevIsSpace = Character.isSpaceChar(buffer.charAt(0));
+    int prevPos = 0;
+    for (int i = 1; i < buffer.length(); ++i) {
+      char c = buffer.charAt(i);
+      boolean isSpace = Character.isSpaceChar(c);
+      if (isSpace != prevIsSpace) {
+        list.add(buffer.substring(prevPos, i));
+        prevPos = i;
+        prevIsSpace = isSpace;
+      }
+    }
+    list.add(buffer.substring(prevPos));
+    return list;
+  }
+
+  public AttributedString highlight(LineReader reader, String buffer) {
+    AttributedStringBuilder builder = new AttributedStringBuilder();
+    List<String> tokens = splitWithSpace(buffer);
+
+    for (String token : tokens) {
+      if (isKeyword(token)) {
+        builder.style(AttributedStyle.BOLD.foreground(AttributedStyle.YELLOW))
+                .append(token);
+      } else {
+        builder.style(AttributedStyle.DEFAULT)
+                .append(token);
+      }
+    }
+
+    return builder.toAttributedString();
+  }
+
+  private boolean isKeyword(String token) {
+    for (String keyword : keywords) {
+      if (keyword.compareToIgnoreCase(token) == 0)
+        return true;
+    }
+    return false;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
new file mode 100755
index 0000000..54c7bf6
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.completer.StringsCompleter;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+
+/**
+ * The shell UI.
+ */
+class CliShell {
+  private final Terminal terminal;
+  private final PrintWriter writer;
+  private final LineReader lineReader;
+  private final String firstPrompt;
+  private final SqlExecutor executor;
+  private final ExecutionContext exeContext;
+  private CliEnvironment env;
+  private boolean keepRunning = true;
+  private Map<Integer, String> executions = new TreeMap<>();
+
+  public CliShell(SqlExecutor executor, CliEnvironment environment, ExecutionContext execContext) {
+    if (executor == null || environment == null || execContext == null) {
+      throw new IllegalArgumentException();
+    }
+
+    // Terminal
+    try {
+      terminal = TerminalBuilder.builder()
+              .name(CliConstants.WINDOW_TITLE)
+              .build();
+    } catch (IOException e) {
+      throw new CliException("Error when creating terminal", e);
+    }
+
+    // Terminal writer
+    writer = terminal.writer();
+
+    // LineReader
+    final DefaultParser parser = new DefaultParser()
+            .eofOnEscapedNewLine(true)
+            .eofOnUnclosedQuote(true);
+    lineReader = LineReaderBuilder.builder()
+            .appName(CliConstants.APP_NAME)
+            .terminal(terminal)
+            .parser(parser)
+            .highlighter(new CliHighlighter())
+            .completer(new StringsCompleter(CliCommandType.getAllCommands()))
+            .build();
+
+    // Command Prompt
+    firstPrompt = new AttributedStringBuilder()
+            .style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
+            .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
+            .toAnsi();
+
+    // Execution context and executor
+    env = environment;
+    env.takeEffect();
+    exeContext = execContext;
+    this.executor = executor;
+    this.executor.start(exeContext);
+  }
+
+  Terminal getTerminal() {
+    return terminal;
+  }
+
+  CliEnvironment getEnvironment() {
+    return env;
+  }
+
+  SqlExecutor getExecutor() {
+    return executor;
+  }
+
+  ExecutionContext getExeContext() {
+    return exeContext;
+  }
+
+  /**
+   * Actually run the shell. Does not return until user choose to exit.
+   */
+  public void open() {
+    // Remember we cannot enter alternate screen mode here as there is only one alternate
+    // screen and we need it to show streaming results. Clear the screen instead.
+    clearScreen();
+    writer.write(CliConstants.WELCOME_MESSAGE);
+
+    try {
+      // Check if jna.jar exists in class path
+      try {
+        ClassLoader.getSystemClassLoader().loadClass("com.sun.jna.NativeLibrary");
+      } catch (ClassNotFoundException e) {
+        // Something's wrong. It could be a dumb terminal if neither jna nor jansi lib is there
+        writer.write("Warning: jna.jar does NOT exist. It may lead to a dumb shell or a performance hit.\n");
+      }
+
+      while (keepRunning) {
+        String line;
+        try {
+          line = lineReader.readLine(firstPrompt);
+        } catch (UserInterruptException e) {
+          continue;
+        } catch (EndOfFileException e) {
+          commandQuit();
+          break;
+        }
+
+        if (!CliUtil.isNullOrEmpty(line)) {
+          CliCommand command = parseLine(line);
+          if (command == null)
+            continue;
+
+          switch (command.getCommandType()) {
+            case CLEAR:
+              commandClear();
+              break;
+
+            case DESCRIBE:
+              commandDescribe(command);
+              break;
+
+            case EXECUTE:
+              commandExecuteFile(command);
+              break;
+
+            case EXIT:
+            case QUIT:
+              commandQuit();
+              break;
+
+            case HELP:
+              commandHelp(command);
+              break;
+
+            case INSERT_INTO:
+              commandInsertInto(command);
+              break;
+
+            case LS:
+              commandLs(command);
+              break;
+
+            case RM:
+              commandRm(command);
+              break;
+
+            case SELECT:
+              commandSelect(command);
+              break;
+
+            case SET:
+              commandSet(command);
+              break;
+
+            case SHOW_FUNCTIONS:
+              commandShowFunctions(command);
+              break;
+
+            case SHOW_TABLES:
+              commandShowTables(command);
+              break;
+
+            case STOP:
+              commandStop(command);
+              break;
+
+            case INVALID_COMMAND:
+              printHelpMessage();
+              break;
+
+            default:
+              writer.write("UNDER DEVELOPEMENT. Command:" + command.getCommandType() + "\n");
+              writer.write("Parameters:" +
+                      (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" : command.getParameters())
+                      + "\n\n");
+              writer.flush();
+          }
+        }
+      }
+    } catch (Exception e) {
+      writer.print(e.getClass().getSimpleName());
+      writer.print(". ");
+      writer.println(e.getMessage());
+      e.printStackTrace(writer);
+      writer.println();
+      writer.println("We are sorry but SamzaSqlShell has encountered a problem and needs to stop.");
+    }
+
+    writer.write("Cleaning up... ");
+    writer.flush();
+    executor.stop(exeContext);
+
+    writer.write("Done.\nBye.\n\n");
+    writer.flush();
+
+    try {
+      terminal.close();
+    } catch (IOException e) {
+      // Doesn't matter
+    }
+  }
+
+  private void commandClear() {
+    clearScreen();
+  }
+
+  private void commandDescribe(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    SqlSchema schema = executor.getTableSchema(exeContext, parameters);
+
+    if (schema == null) {
+      writer.println("Failed to get schema. Error: " + executor.getErrorMsg());
+    } else {
+      writer.println();
+      List<String> lines = formatSchema4Display(schema);
+      for (String line : lines) {
+        writer.println(line);
+      }
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandSet(CliCommand command) {
+    String param = command.getParameters();
+    if (CliUtil.isNullOrEmpty(param)) {
+      try {
+        env.printAll(writer);
+      } catch (IOException e) {
+        e.printStackTrace(writer);
+      }
+      writer.println();
+      writer.flush();
+      return;
+    }
+    String[] params = param.split("=");
+    if (params.length != 2) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    int ret = env.setEnvironmentVariable(params[0], params[1]);
+    if (ret == 0) {
+      writer.print(params[0]);
+      writer.print(" set to ");
+      writer.println(params[1]);
+    } else if (ret == -1) {
+      writer.print("Unknow variable: ");
+      writer.println(params[0]);
+    } else if (ret == -2) {
+      writer.print("Invalid value: ");
+      writer.println(params[1]);
+      List<String> vals = env.getPossibleValues(params[0]);
+      writer.print("Possible values:");
+      for (String s : vals) {
+        writer.print(CliConstants.SPACE);
+        writer.print(s);
+      }
+      writer.println();
+    }
+
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandExecuteFile(CliCommand command) {
+    String fullCmdStr = command.getFullCommand();
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println("Usage: execute <fileuri>\n");
+      writer.flush();
+      return;
+    }
+    URI uri = null;
+    boolean valid = false;
+    File file = null;
+    try {
+      uri = new URI(parameters);
+      file = new File(uri.getPath());
+      valid = file.exists() && !file.isDirectory();
+    } catch (URISyntaxException e) {
+    }
+    if (!valid) {
+      writer.println("Invalid URI.\n");
+      writer.flush();
+      return;
+    }
+
+    NonQueryResult nonQueryResult = executor.executeNonQuery(exeContext, file);
+    if (!nonQueryResult.succeeded()) {
+      writer.print("Execution error: ");
+      writer.println(executor.getErrorMsg());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    executions.put(nonQueryResult.getExecutionId(), fullCmdStr);
+    List<String> submittedStmts = nonQueryResult.getSubmittedStmts();
+    List<String> nonsubmittedStmts = nonQueryResult.getNonSubmittedStmts();
+
+    writer.println("Sql file submitted. Execution ID: " + nonQueryResult.getExecutionId());
+    writer.println("Submitted statements: \n");
+    if (submittedStmts == null || submittedStmts.size() == 0) {
+      writer.println("\tNone.");
+    } else {
+      for (String statement : submittedStmts) {
+        writer.print("\t");
+        writer.println(statement);
+      }
+      writer.println();
+    }
+
+    if (nonsubmittedStmts != null && nonsubmittedStmts.size() != 0) {
+      writer.println("Statements NOT submitted: \n");
+      for (String statement : nonsubmittedStmts) {
+        writer.print("\t");
+        writer.println(statement);
+      }
+      writer.println();
+    }
+
+    writer.println("Note: All query statements in a sql file are NOT submitted.");
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandInsertInto(CliCommand command) {
+    String fullCmdStr = command.getFullCommand();
+    NonQueryResult result = executor.executeNonQuery(exeContext,
+            Collections.singletonList(fullCmdStr));
+
+    if (result.succeeded()) {
+      writer.print("Execution submitted successfully. Id: ");
+      writer.println(String.valueOf(result.getExecutionId()));
+      executions.put(result.getExecutionId(), fullCmdStr);
+    } else {
+      writer.write("Execution failed to submit. Error: ");
+      writer.println(executor.getErrorMsg());
+    }
+
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandLs(CliCommand command) {
+    List<Integer> execIds = new ArrayList<>();
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      execIds.addAll(executions.keySet());
+    } else {
+      String[] params = parameters.split("\u0020");
+      for (String param : params) {
+        Integer id = null;
+        try {
+          id = Integer.valueOf(param);
+        } catch (NumberFormatException e) {
+        }
+        if (id != null && executions.containsKey(id)) {
+          execIds.add(id);
+        }
+      }
+    }
+    if (execIds.size() == 0) {
+      writer.println();
+      return;
+    }
+
+    execIds.sort(Integer::compareTo);
+
+    final int terminalWidth = terminal.getWidth();
+    final int ID_WIDTH = 3;
+    final int STATUS_WIDTH = 20;
+    final int CMD_WIDTH = terminalWidth - ID_WIDTH - STATUS_WIDTH - 4;
+
+    AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+    AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+    for (int i = 0; i < execIds.size(); ++i) {
+      Integer id = execIds.get(i);
+      String cmd = executions.get(id);
+      if (cmd == null)
+        continue;
+
+      String status = "UNKNOWN";
+      try {
+        ExecutionStatus execStatus = executor.queryExecutionStatus(id);
+        if (execStatus != null)
+          status = execStatus.name();
+      } catch (ExecutionException e) {
+      }
+
+      int cmdStartIdx = 0;
+      int cmdLength = cmd.length();
+      StringBuilder line;
+      while (cmdStartIdx < cmdLength) {
+        line = new StringBuilder(terminalWidth);
+        if (cmdStartIdx == 0) {
+          line.append(CliConstants.SPACE);
+          line.append(id);
+          CliUtil.appendTo(line, 1 + ID_WIDTH + 1, CliConstants.SPACE);
+          line.append(status);
+        }
+        CliUtil.appendTo(line, 1 + ID_WIDTH + 1 + STATUS_WIDTH + 1, CliConstants.SPACE);
+
+        int numToWrite = Math.min(CMD_WIDTH, cmdLength - cmdStartIdx);
+        if (numToWrite > 0) {
+          line.append(cmd, cmdStartIdx, cmdStartIdx + numToWrite);
+          cmdStartIdx += numToWrite;
+        }
+
+        if (i % 2 == 0) {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+          attrBuilder.append(line.toString());
+          writer.println(attrBuilder.toAnsi());
+        } else {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+          attrBuilder.append(line.toString());
+          writer.println(attrBuilder.toAnsi());
+        }
+      }
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandRm(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    List<Integer> execIds = new ArrayList<>();
+    String[] params = parameters.split("\u0020");
+    for (String param : params) {
+      Integer id = null;
+      try {
+        id = Integer.valueOf(param);
+      } catch (NumberFormatException e) {
+      }
+      if (id == null || !executions.containsKey(id)) {
+        writer.print("Error: ");
+        writer.print(param);
+        writer.println(" is not a valid id.");
+      } else {
+        execIds.add(id);
+      }
+    }
+
+    for (Integer id : execIds) {
+      ExecutionStatus status = null;
+      try {
+        status = executor.queryExecutionStatus(id);
+      } catch (ExecutionException e) {
+      }
+      if (status == null) {
+        writer.println(String.format("Error: failed to get execution status for %d. %s",
+                id, executor.getErrorMsg()));
+        continue;
+      }
+      if (status == ExecutionStatus.Running) {
+        writer.println(String.format("Execution %d is still running. Stop it first.", id));
+        continue;
+      }
+      if (executor.removeExecution(exeContext, id)) {
+        writer.println(String.format("Execution %d was removed.", id));
+        executions.remove(id);
+      } else {
+        writer.println(String.format("Error: failed to remove execution %d. %s",
+                id, executor.getErrorMsg()));
+      }
+
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandQuit() {
+    keepRunning = false;
+  }
+
+  private void commandSelect(CliCommand command) {
+    QueryResult queryResult = executor.executeQuery(exeContext, command.getFullCommand());
+
+    if (queryResult.succeeded()) {
+      CliView view = new QueryResultLogView();
+      view.open(this, queryResult);
+      executor.stopExecution(exeContext, queryResult.getExecutionId());
+    } else {
+      writer.write("Execution failed. Error: ");
+      writer.println(executor.getErrorMsg());
+      writer.println();
+      writer.flush();
+    }
+  }
+
+  private void commandShowTables(CliCommand command) {
+    List<String> tableNames = executor.listTables(exeContext);
+
+    if (tableNames != null) {
+      for (String tableName : tableNames) {
+        writer.println(tableName);
+      }
+    } else {
+      writer.print("Failed to list tables. Error: ");
+      writer.println(executor.getErrorMsg());
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandShowFunctions(CliCommand command) {
+    List<SqlFunction> fns = executor.listFunctions(exeContext);
+
+    if (fns != null) {
+      for (SqlFunction fn : fns) {
+        writer.println(fn.toString());
+      }
+    } else {
+      writer.print("Failed to list functions. Error: ");
+      writer.println(executor.getErrorMsg());
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandStop(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    List<Integer> execIds = new ArrayList<>();
+    String[] params = parameters.split("\u0020");
+    for (String param : params) {
+      Integer id = null;
+      try {
+        id = Integer.valueOf(param);
+      } catch (NumberFormatException e) {
+      }
+      if (id == null || !executions.containsKey(id)) {
+        writer.print("Error: ");
+        writer.print(param);
+        writer.println(" is not a valid id.");
+      } else {
+        execIds.add(id);
+      }
+    }
+
+    for (Integer id : execIds) {
+      if (executor.stopExecution(exeContext, id)) {
+        writer.println(String.format("Request to stop execution %d was sent.", id));
+      } else {
+        writer.println(String.format("Failed to stop %d: %s", id, executor.getErrorMsg()));
+      }
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandHelp(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      printHelpMessage();
+      return;
+    }
+
+    parameters = parameters.trim().toUpperCase();
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      String cmdText = cmdType.getCommandName();
+      if (cmdText.equals(parameters)) {
+        writer.println(cmdType.getUsage());
+        writer.println();
+        writer.flush();
+        return;
+      }
+    }
+
+    writer.print("Unknown command: ");
+    writer.println(parameters);
+    writer.println();
+    writer.flush();
+  }
+
+
+  private CliCommand parseLine(String line) {
+    line = trimCommand(line);
+    if (CliUtil.isNullOrEmpty(line))
+      return null;
+
+    String upperCaseLine = line.toUpperCase();
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      String cmdText = cmdType.getCommandName();
+      if (upperCaseLine.startsWith(cmdText)) {
+        if (upperCaseLine.length() == cmdText.length())
+          return new CliCommand(cmdType);
+        else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) {
+          String parameter = line.substring(cmdText.length()).trim();
+          if (!parameter.isEmpty())
+            return new CliCommand(cmdType, parameter);
+        }
+      }
+    }
+    return new CliCommand(CliCommandType.INVALID_COMMAND);
+  }
+
+  private void printHelpMessage() {
+    writer.println();
+    AttributedStringBuilder builder = new AttributedStringBuilder();
+    builder.append("The following commands are supported by ")
+            .append(CliConstants.APP_NAME)
+            .append(" at the moment.\n\n");
+
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      if (cmdType == CliCommandType.INVALID_COMMAND)
+        continue;
+
+      String cmdText = cmdType.getCommandName();
+      String cmdDescription = cmdType.getDescription();
+
+      builder.style(AttributedStyle.DEFAULT.bold())
+              .append(cmdText)
+              .append("\t\t")
+              .style(AttributedStyle.DEFAULT)
+              .append(cmdDescription)
+              .append("\n");
+    }
+
+    writer.println(builder.toAnsi());
+    writer.println("HELP <COMMAND> to get help for a specific command.\n");
+    writer.flush();
+  }
+
+  private void clearScreen() {
+    terminal.puts(InfoCmp.Capability.clear_screen);
+  }
+
+  /*
+      Field    | Type
+      -------------------------
+      Field1   | Type 1
+      Field2   | VARCHAR(STRING)
+      Field... | VARCHAR(STRING)
+      -------------------------
+  */
+  private List<String> formatSchema4Display(SqlSchema schema) {
+    final String HEADER_FIELD = "Field";
+    final String HEADER_TYPE = "Type";
+    final char SEPERATOR = '|';
+    final char LINE_SEP = '-';
+
+    int terminalWidth = terminal.getWidth();
+    // Two spaces * 2 plus one SEPERATOR
+    if (terminalWidth < 2 + 2 + 1 + HEADER_FIELD.length() + HEADER_TYPE.length()) {
+      return Collections.singletonList("Not enough room.");
+    }
+
+    // Find the best seperator position for least rows
+    int seperatorPos = HEADER_FIELD.length() + 2;
+    int minRowNeeded = Integer.MAX_VALUE;
+    int longestLineCharNum = 0;
+    int rowCount = schema.getFieldCount();
+    for (int j = seperatorPos; j < terminalWidth - HEADER_TYPE.length() - 2; ++j) {
+      boolean fieldWrapped = false;
+      int rowNeeded = 0;
+      for (int i = 0; i < rowCount; ++i) {
+        int fieldLen = schema.getFieldName(i).length();
+        int typeLen = schema.getFieldTypeName(i).length();
+        int fieldRowNeeded = CliUtil.ceilingDiv(fieldLen, j - 2);
+        int typeRowNeeded = CliUtil.ceilingDiv(typeLen, terminalWidth - 1 - j - 2);
+
+        rowNeeded += Math.max(fieldRowNeeded, typeRowNeeded);
+        fieldWrapped |= fieldRowNeeded > 1;
+        if (typeRowNeeded > 1) {
+          longestLineCharNum = terminalWidth;
+        } else {
+          longestLineCharNum = Math.max(longestLineCharNum, j + typeLen + 2 + 1);
+        }
+      }
+      if (rowNeeded < minRowNeeded) {
+        minRowNeeded = rowNeeded;
+        seperatorPos = j;
+      }
+      if (!fieldWrapped)
+        break;
+    }
+
+    List<String> lines = new ArrayList<>(minRowNeeded + 4);
+
+    // Header
+    StringBuilder line = new StringBuilder(terminalWidth);
+    line.append(CliConstants.SPACE);
+    line.append(HEADER_FIELD);
+    CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+    line.append(SEPERATOR);
+    line.append(CliConstants.SPACE);
+    line.append(HEADER_TYPE);
+    lines.add(line.toString());
+    line = new StringBuilder(terminalWidth);
+    CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+    lines.add(line.toString());
+
+    // Body
+    AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+    AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+
+    final int fieldColSize = seperatorPos - 2;
+    final int typeColSize = terminalWidth - seperatorPos - 1 - 2;
+    for (int i = 0; i < rowCount; ++i) {
+      String field = schema.getFieldName(i);
+      String type = schema.getFieldTypeName(i);
+      int fieldLen = field.length();
+      int typeLen = type.length();
+      int fieldStartIdx = 0, typeStartIdx = 0;
+      while (fieldStartIdx < fieldLen || typeStartIdx < typeLen) {
+        line = new StringBuilder(terminalWidth);
+        line.append(CliConstants.SPACE);
+        int numToWrite = Math.min(fieldColSize, fieldLen - fieldStartIdx);
+        if (numToWrite > 0) {
+          line.append(field, fieldStartIdx, fieldStartIdx + numToWrite);
+          fieldStartIdx += numToWrite;
+        }
+        CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+        line.append(SEPERATOR);
+        line.append(CliConstants.SPACE);
+
+        numToWrite = Math.min(typeColSize, typeLen - typeStartIdx);
+        if (numToWrite > 0) {
+          line.append(type, typeStartIdx, typeStartIdx + numToWrite);
+          typeStartIdx += numToWrite;
+        }
+
+        if (i % 2 == 0) {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+          attrBuilder.append(line.toString());
+          lines.add(attrBuilder.toAnsi());
+        } else {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+          attrBuilder.append(line.toString());
+          lines.add(attrBuilder.toAnsi());
+        }
+      }
+    }
+
+    // Footer
+    line = new StringBuilder(terminalWidth);
+    CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+    lines.add(line.toString());
+    return lines;
+  }
+
+  // Trims: leading spaces; trailing spaces and ";"s
+  private String trimCommand(String command) {
+    if (CliUtil.isNullOrEmpty(command))
+      return command;
+
+    int len = command.length();
+    int st = 0;
+
+    while ((st < len) && (command.charAt(st) <= ' ')) {
+      st++;
+    }
+    while ((st < len) && ((command.charAt(len - 1) <= ' ')
+            || command.charAt(len - 1) == ';')) {
+      len--;
+    }
+    return ((st > 0) || (len < command.length())) ? command.substring(st, len) : command;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
new file mode 100644
index 0000000..de82100
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.QueryResult;
+
+/**
+ * For displaying the streaming result of a SELECT statement.
+ */
+public interface CliView {
+  public void open(CliShell shell, QueryResult queryResult);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
new file mode 100755
index 0000000..d85cdae
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.sql.client.impl.SamzaExecutor;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry of the program.
+ */
+public class Main {
+    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
+
+    public static void main(String[] args) {
+      // Get configuration file path
+      String configFilePath = null;
+      for(int i = 0; i < args.length; ++i) {
+        switch(args[i]) {
+          case "-conf":
+            if(i + 1 < args.length) {
+              configFilePath = args[i + 1];
+              i++;
+            }
+            break;
+          default:
+            LOG.warn("Unknown parameter {}", args[i]);
+            break;
+        }
+      }
+
+      SqlExecutor executor = null;
+      CliEnvironment environment = new CliEnvironment();
+      Map<String, String> executorConfig = new HashMap<>();
+
+      if(!CliUtil.isNullOrEmpty(configFilePath)) {
+        LOG.info("Configuration file path is: {}", configFilePath);
+        try {
+          FileReader fileReader = new FileReader(configFilePath);
+          BufferedReader bufferedReader = new BufferedReader(fileReader);
+          String line;
+          while ((line = bufferedReader.readLine()) != null) {
+            if (line.startsWith("#") || line.startsWith("[")) {
+              continue;
+            }
+            String[] strs = line.split("=");
+            if (strs.length != 2) {
+              continue;
+            }
+            String key = strs[0].trim().toLowerCase();
+            String value = strs[1].trim();
+            if(key.startsWith(CliConstants.CONFIG_SHELL_PREFIX)) {
+              if(key.equals(CliConstants.CONFIG_EXECUTOR)) {
+                try {
+                  Class<?> clazz = Class.forName(value);
+                  Constructor<?> ctor = clazz.getConstructor();
+                  executor = (SqlExecutor) ctor.newInstance();
+                  LOG.info("Sql executor creation succeed. Executor class is: {}", value);
+                } catch (ClassNotFoundException | NoSuchMethodException
+                    | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+                  throw new CliException(String.format("Failed to create executor %s.", value), e);
+                }
+                continue;
+              }
+
+              // Suppose a shell variable.
+              int result = environment.setEnvironmentVariable(key, value);
+              if(result == -1) { // CliEnvironment doesn't recognize the key.
+                LOG.warn("Unknowing shell environment variable: {}", key);
+              } else if(result == -2) { // Invalid value
+                LOG.warn("Unknowing shell environment value: {}", value);
+              }
+            } else {
+              executorConfig.put(key, value);
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("Error in opening and reading the configuration file {}", e.toString());
+        }
+      }
+      if(executor == null) {
+        executor = new SamzaExecutor();
+      }
+
+      CliShell shell = new CliShell(executor, environment, new ExecutionContext(executorConfig));
+      shell.open();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
new file mode 100644
index 0000000..b1973bc
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.QueryResult;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.jline.keymap.BindingReader;
+import org.jline.keymap.KeyMap;
+import org.jline.terminal.Attributes;
+import org.jline.terminal.Terminal;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.jline.keymap.KeyMap.ctrl;
+
+
+/**
+ * A scrolling (logging) view of the query result of a streaming SELECT statement.
+ */
+
+public class QueryResultLogView implements CliView {
+  private static final int DEFAULT_REFRESH_INTERVAL = 100; // all intervals are in ms
+
+  private int refreshInterval = DEFAULT_REFRESH_INTERVAL;
+  private int height;
+  private Terminal terminal;
+  private SqlExecutor executor;
+  private ExecutionContext exeContext;
+  private volatile boolean keepRunning = true;
+  private boolean paused = false;
+
+  // Stupid BindingReader doesn't have a real nonblocking mode
+  // Must create a new thread to get user input
+  private Thread inputThread;
+  private BindingReader keyReader;
+
+  public QueryResultLogView() {
+  }
+
+  // -- implementation of CliView -------------------------------------------
+
+  public void open(CliShell shell, QueryResult queryResult) {
+    terminal = shell.getTerminal();
+    executor = shell.getExecutor();
+    exeContext = shell.getExeContext();
+
+    TerminalStatus prevStatus = setupTerminal();
+    try {
+      keyReader = new BindingReader(terminal.reader());
+      inputThread = new InputThread();
+      inputThread.start();
+      while (keepRunning) {
+        try {
+          display();
+          if (keepRunning)
+            Thread.sleep(refreshInterval);
+        } catch (InterruptedException e) {
+          continue;
+        }
+      }
+
+      try {
+        inputThread.join(1 * 1000);
+      } catch (InterruptedException e) {
+      }
+    } finally {
+      restoreTerminal(prevStatus);
+    }
+    if (inputThread.isAlive()) {
+      terminal.writer().println("Warning: input thread hang. Have to kill!");
+      terminal.writer().flush();
+      inputThread.interrupt();
+    }
+  }
+
+  // ------------------------------------------------------------------------
+
+  private void display() {
+    updateTerminalSize();
+    int rowsInBuffer = executor.getRowCount();
+    if (rowsInBuffer <= 0 || paused) {
+      clearStatusBar();
+      drawStatusBar(rowsInBuffer);
+      return;
+    }
+
+    while (rowsInBuffer > 0) {
+      clearStatusBar();
+      int step = 10;
+      List<String[]> lines = executor.consumeQueryResult(exeContext, 0, step - 1);
+      for (String[] line : lines) {
+        for (int i = 0; i < line.length; ++i) {
+          terminal.writer().write(line[i] == null ? "null" : line[i]);
+          terminal.writer().write(i == line.length - 1 ? "\n" : " ");
+        }
+      }
+      terminal.flush();
+      clearStatusBar();
+      drawStatusBar(rowsInBuffer);
+
+      if (!keepRunning || paused)
+        return;
+
+      rowsInBuffer = executor.getRowCount();
+    }
+  }
+
+  private void clearStatusBar() {
+    terminal.puts(InfoCmp.Capability.save_cursor);
+    terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+    terminal.puts(InfoCmp.Capability.delete_line, height - 1, 0);
+    terminal.puts(InfoCmp.Capability.restore_cursor);
+  }
+
+  private void drawStatusBar(int rowsInBuffer) {
+    terminal.puts(InfoCmp.Capability.save_cursor);
+    terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+    AttributedStyle statusBarStyle = AttributedStyle.DEFAULT.background(AttributedStyle.WHITE)
+            .foreground(AttributedStyle.BLACK);
+    AttributedStringBuilder attrBuilder = new AttributedStringBuilder()
+            .style(statusBarStyle.bold().italic())
+            .append("Q")
+            .style(statusBarStyle)
+            .append(": Quit     ")
+            .style(statusBarStyle.bold().italic())
+            .append("SPACE")
+            .style(statusBarStyle)
+            .append(": Pause/Resume     ")
+            .append(String.valueOf(rowsInBuffer) + " rows in buffer     ");
+    if (paused) {
+      attrBuilder.style(statusBarStyle.bold().foreground(AttributedStyle.RED).blink())
+              .append("PAUSED");
+    }
+    String statusBarText = attrBuilder.toAnsi();
+    terminal.writer().print(statusBarText);
+    terminal.flush();
+    terminal.puts(InfoCmp.Capability.restore_cursor);
+  }
+
+  private TerminalStatus setupTerminal() {
+    TerminalStatus prevStatus = new TerminalStatus();
+
+    // Signal handlers
+    prevStatus.handler_INT = terminal.handle(Terminal.Signal.INT, this::handleSignal);
+    prevStatus.handler_QUIT = terminal.handle(Terminal.Signal.QUIT, this::handleSignal);
+    prevStatus.handler_TSTP = terminal.handle(Terminal.Signal.TSTP, this::handleSignal);
+    prevStatus.handler_CONT = terminal.handle(Terminal.Signal.CONT, this::handleSignal);
+    prevStatus.handler_WINCH = terminal.handle(Terminal.Signal.WINCH, this::handleSignal);
+
+    // Attributes
+    prevStatus.attributes = terminal.getAttributes();
+    Attributes newAttributes = new Attributes(prevStatus.attributes);
+    // (003, ETX, Ctrl-C, or also 0177, DEL, rubout) Interrupt char‐
+    // acter (INTR).  Send a SIGINT signal.  Recognized when ISIG is
+    // set, and then not passed as input.
+    newAttributes.setControlChar(Attributes.ControlChar.VINTR, 0);
+    // (034, FS, Ctrl-\) Quit character (QUIT).  Send SIGQUIT signal.
+    // Recognized when ISIG is set, and then not passed as input.
+    // newAttributes.setControlChar(Attributes.ControlChar.VQUIT, 0);
+    newAttributes.setControlChar(Attributes.ControlChar.VMIN, 1);
+    newAttributes.setControlChar(Attributes.ControlChar.VTIME, 0);
+    // Enables signals and SIGTTOU signal to the process group of a background
+    // process which tries to write to our terminal
+    newAttributes.setLocalFlags(
+            EnumSet.of(Attributes.LocalFlag.ISIG, Attributes.LocalFlag.TOSTOP), true);
+    // No canonical mode, no echo, and no implementation-defined input processing
+    newAttributes.setLocalFlags(EnumSet.of(
+            Attributes.LocalFlag.ICANON, Attributes.LocalFlag.ECHO,
+            Attributes.LocalFlag.IEXTEN), false);
+    // Input flags
+    newAttributes.setInputFlags(EnumSet.of(
+            Attributes.InputFlag.ICRNL, Attributes.InputFlag.INLCR, Attributes.InputFlag.IXON), false);
+    terminal.setAttributes(newAttributes);
+
+    // Capabilities
+    // tput smcup; use alternate screen
+    terminal.puts(InfoCmp.Capability.enter_ca_mode);
+    terminal.puts(InfoCmp.Capability.cursor_invisible);
+    terminal.puts(InfoCmp.Capability.cursor_home);
+
+    terminal.flush();
+
+    return prevStatus;
+  }
+
+  private void restoreTerminal(TerminalStatus status) {
+    // Signal handlers
+    terminal.handle(Terminal.Signal.INT, status.handler_INT);
+    terminal.handle(Terminal.Signal.QUIT, status.handler_QUIT);
+    terminal.handle(Terminal.Signal.TSTP, status.handler_TSTP);
+    terminal.handle(Terminal.Signal.CONT, status.handler_CONT);
+    terminal.handle(Terminal.Signal.WINCH, status.handler_WINCH);
+
+    // Attributes
+    terminal.setAttributes(status.attributes);
+
+    // Capability
+    terminal.puts(InfoCmp.Capability.exit_ca_mode);
+    terminal.puts(InfoCmp.Capability.cursor_visible);
+  }
+
+  private void handleSignal(Terminal.Signal signal) {
+    switch (signal) {
+      case INT:
+      case QUIT:
+        keepRunning = false;
+        break;
+      case TSTP:
+        paused = true;
+        break;
+      case CONT:
+        paused = false;
+        break;
+      case WINCH:
+        updateTerminalSize();
+        break;
+    }
+  }
+
+  private void updateTerminalSize() {
+    terminal.flush();
+    height = terminal.getHeight();
+  }
+
+  private KeyMap<Action> bindActionKey() {
+    KeyMap<Action> keyMap = new KeyMap<>();
+    keyMap.bind(Action.QUIT, "Q", "q", ctrl('c'));
+    keyMap.bind(Action.SPACE, " ");
+
+    return keyMap;
+  }
+
+  public enum Action {
+    QUIT,
+    SPACE
+  }
+
+  private static class TerminalStatus {
+    Terminal.SignalHandler handler_INT;
+    Terminal.SignalHandler handler_QUIT;
+    Terminal.SignalHandler handler_TSTP;
+    Terminal.SignalHandler handler_CONT;
+    Terminal.SignalHandler handler_WINCH;
+
+    Attributes attributes;
+  }
+
+  private class InputThread extends Thread {
+    public InputThread() {
+    }
+
+    public void run() {
+      KeyMap<Action> keyMap = bindActionKey();
+
+      Action action = keyReader.readBinding(keyMap, null, true);
+      while (action != null && keepRunning) {
+        switch (action) {
+          case QUIT:
+            keepRunning = false;
+            return;
+          case SPACE:
+            paused = !paused;
+            break;
+        }
+        action = keyReader.readBinding(keyMap, null, true);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
new file mode 100644
index 0000000..42dd285
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.apache.samza.sql.client.interfaces.SqlSchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Schema converter which converts Avro schema to Samza Sql schema
+ */
+public class AvroSqlSchemaConverter {
+  private static final Logger LOG = LoggerFactory.getLogger(AvroSqlSchemaConverter.class);
+
+  public static SqlSchema convertAvroToSamzaSqlSchema(String schema) {
+    Schema avroSchema = Schema.parse(schema);
+    return getSchema(avroSchema.getFields());
+  }
+
+  private static SqlSchema getSchema(List<Schema.Field> fields) {
+    SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
+    for (Schema.Field field : fields) {
+      schemaBuilder.addField(field.name(), getColumnTypeName(getFieldType(field.schema())));
+    }
+    return schemaBuilder.toSchema();
+  }
+
+  private static String getColumnTypeName(SamzaSqlFieldType fieldType) {
+    if (fieldType.isPrimitiveField()) {
+      return fieldType.getTypeName().toString();
+    } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.MAP) {
+      return String.format("MAP(%s)", getColumnTypeName(fieldType.getValueType()));
+    } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.ARRAY) {
+      return String.format("ARRAY(%s)", getColumnTypeName(fieldType.getElementType()));
+    } else {
+      SqlSchema schema = fieldType.getRowSchema();
+      List<String> fieldTypes = IntStream.range(0, schema.getFieldCount())
+          .mapToObj(i -> schema.getFieldName(i) + " " + schema.getFieldTypeName(i))
+          .collect(Collectors.toList());
+      String rowSchemaValue = Joiner.on(", ").join(fieldTypes);
+      return String.format("STRUCT(%s)", rowSchemaValue);
+    }
+  }
+
+  private static SamzaSqlFieldType getFieldType(org.apache.avro.Schema schema) {
+    switch (schema.getType()) {
+      case ARRAY:
+        return SamzaSqlFieldType.createArrayFieldType(getFieldType(schema.getElementType()));
+      case BOOLEAN:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN);
+      case DOUBLE:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.DOUBLE);
+      case FLOAT:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.FLOAT);
+      case ENUM:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+      case UNION:
+        // NOTE: We only support Union types when they are used for representing Nullable fields in Avro
+        List<org.apache.avro.Schema> types = schema.getTypes();
+        if (types.size() == 2) {
+          if (types.get(0).getType() == org.apache.avro.Schema.Type.NULL) {
+            return getFieldType(types.get(1));
+          } else if ((types.get(1).getType() == org.apache.avro.Schema.Type.NULL)) {
+            return getFieldType(types.get(0));
+          }
+        }
+      case FIXED:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+      case STRING:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+      case BYTES:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BYTES);
+      case INT:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT32);
+      case LONG:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT64);
+      case RECORD:
+        return SamzaSqlFieldType.createRowFieldType(getSchema(schema.getFields()));
+      case MAP:
+        return SamzaSqlFieldType.createMapFieldType(getFieldType(schema.getValueType()));
+      default:
+        String msg = String.format("Field Type %s is not supported", schema.getType());
+        LOG.error(msg);
+        throw new SamzaException(msg);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
new file mode 100644
index 0000000..49e051a
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * System factory of Samza Sql Shell which needs to provide Consumer, Producer and Admin
+ */
+public class CliLoggingSystemFactory implements SystemFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CliLoggingSystemFactory.class);
+  private static AtomicInteger messageCounter = new AtomicInteger(0);
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return new CliLoggingSystemFactory.LoggingSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new CliLoggingSystemFactory.SimpleSystemAdmin(config);
+  }
+
+  private static class SimpleSystemAdmin implements SystemAdmin {
+
+    public SimpleSystemAdmin(Config config) {
+    }
+
+    @Override
+    public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+      return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
+    }
+
+    @Override
+    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+      return streamNames.stream()
+              .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
+                      Collections.singletonMap(new Partition(0),
+                              new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+    }
+
+    @Override
+    public Integer offsetComparator(String offset1, String offset2) {
+      if (offset1 == null) {
+        return offset2 == null ? 0 : -1;
+      } else if (offset2 == null) {
+        return 1;
+      }
+      return offset1.compareTo(offset2);
+    }
+  }
+
+  private class LoggingSystemProducer implements SystemProducer {
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void register(String source) {
+      LOG.info("Registering source" + source);
+    }
+
+    @Override
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      LOG.info(String.format(String.format("Message %d :", messageCounter.incrementAndGet())));
+      String msg = String.format("OutputStream:%s Key:%s Value:%s", envelope.getSystemStream(), envelope.getKey(),
+              new String((byte[]) envelope.getMessage()));
+      LOG.info(msg);
+
+      SamzaExecutor.saveOutputMessage(envelope);
+    }
+
+    @Override
+    public void flush(String source) {
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..8d0b12f
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.AvroTypeFactoryImpl;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Relational schemas provider which reads system schema from a given directory
+ */
+public class FileSystemAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+  public static final String CFG_SCHEMA_DIR = "schemaDir";
+
+  @Override
+  public RelSchemaProvider create(SystemStream systemStream, Config config) {
+    return new FileSystemAvroRelSchemaProvider(systemStream, config);
+  }
+
+  private class FileSystemAvroRelSchemaProvider implements AvroRelSchemaProvider {
+    private final SystemStream systemStream;
+    private final String schemaDir;
+
+    public FileSystemAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+      this.systemStream = systemStream;
+      this.schemaDir = config.get(CFG_SCHEMA_DIR);
+    }
+
+    @Override
+    public RelDataType getRelationalSchema() {
+      String schemaStr = this.getSchema(this.systemStream);
+      Schema schema = Schema.parse(schemaStr);
+      AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+      return avroTypeFactory.createType(schema);
+    }
+
+    @Override
+    public String getSchema(SystemStream systemStream) {
+      String fileName = String.format("%s.avsc", systemStream.getStream());
+      File file = new File(schemaDir, fileName);
+      try {
+        return Schema.parse(file).toString();
+      } catch (IOException e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+}