You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/22 23:48:33 UTC
[2/3] samza git commit: SAMZA-1901: Implementation of Samza SQL Shell
SAMZA-1901: Implementation of Samza SQL Shell
## What changes were proposed in this pull request?
This PR is to implement Samza SQL shell. The document about the shell was attached [here](https://issues.apache.org/jira/browse/SAMZA-1901).
## How was this patch tested?
1. Add unit tests
2. Run the shell with use cases mentioned in the attached document under https://issues.apache.org/jira/browse/SAMZA-1901
Author: Weiqing Yang <ya...@gmail.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>
Closes #654 from weiqingy/samza-shell
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2782818e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2782818e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2782818e
Branch: refs/heads/1.0.0
Commit: 2782818eb99231a3e67764a569fe93139494d8fc
Parents: ba90a51
Author: Weiqing Yang <ya...@gmail.com>
Authored: Mon Oct 22 09:51:40 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 22 16:47:47 2018 -0700
----------------------------------------------------------------------
build.gradle | 28 +
gradle/dependency-versions.gradle | 2 +
.../JobNodeConfigurationGenerator.java | 6 +-
samza-sql-shell/conf/samza-sql-shell-log4j.xml | 49 ++
samza-sql-shell/conf/shell-defaults.conf | 28 +
samza-sql-shell/scripts/samza-sql-shell.sh | 42 +
.../apache/samza/sql/client/cli/CliCommand.java | 53 ++
.../samza/sql/client/cli/CliCommandType.java | 78 ++
.../samza/sql/client/cli/CliConstants.java | 57 ++
.../samza/sql/client/cli/CliEnvironment.java | 130 +++
.../samza/sql/client/cli/CliHighlighter.java | 89 ++
.../apache/samza/sql/client/cli/CliShell.java | 821 +++++++++++++++++++
.../apache/samza/sql/client/cli/CliView.java | 29 +
.../org/apache/samza/sql/client/cli/Main.java | 117 +++
.../sql/client/cli/QueryResultLogView.java | 291 +++++++
.../sql/client/impl/AvroSqlSchemaConverter.java | 112 +++
.../client/impl/CliLoggingSystemFactory.java | 117 +++
.../FileSystemAvroRelSchemaProviderFactory.java | 75 ++
.../samza/sql/client/impl/SamzaExecutor.java | 511 ++++++++++++
.../sql/client/impl/SamzaSqlFieldType.java | 94 +++
.../sql/client/impl/SamzaSqlUdfDisplayInfo.java | 71 ++
.../sql/client/interfaces/ExecutionContext.java | 41 +
.../client/interfaces/ExecutionException.java | 40 +
.../sql/client/interfaces/ExecutionStatus.java | 30 +
.../sql/client/interfaces/NonQueryResult.java | 62 ++
.../sql/client/interfaces/QueryResult.java | 49 ++
.../sql/client/interfaces/SqlExecutor.java | 171 ++++
.../sql/client/interfaces/SqlFunction.java | 55 ++
.../samza/sql/client/interfaces/SqlSchema.java | 56 ++
.../sql/client/interfaces/SqlSchemaBuilder.java | 63 ++
.../samza/sql/client/util/CliException.java | 41 +
.../apache/samza/sql/client/util/CliUtil.java | 43 +
.../sql/client/util/RandomAccessQueue.java | 96 +++
.../sql/client/impl/SamzaExecutorTest.java | 79 ++
.../sql/client/util/RandomAccessQueueTest.java | 89 ++
.../src/test/resources/ProfileChangeStream.avsc | 51 ++
.../sql/runner/SamzaSqlApplicationConfig.java | 2 +-
settings.gradle | 1 +
38 files changed, 3764 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 856bb1d..b6ee4ac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -336,6 +336,34 @@ project(':samza-sql') {
}
}
+project(':samza-sql-shell') {
+ apply plugin: 'java'
+
+ dependencies {
+ compile project(':samza-sql')
+ compile project(':samza-tools')
+ compile project(":samza-core_$scalaVersion")
+ compile project(':samza-api')
+ compile project(":samza-kafka_$scalaVersion")
+ compile project(':samza-azure')
+ compile "net.java.dev.jna:jna:$jnaVersion"
+ compile "org.jline:jline:$jlineVersion"
+
+ testCompile "junit:junit:$junitVersion"
+ }
+
+ tasks.create(name: "releaseSqlShellTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
+ into "samza-sql-shell-${version}"
+ compression = Compression.GZIP
+ from(project.file("./scripts")) { into "scripts/" }
+ from(project.file("./conf")) { into "conf/" }
+ from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "scripts/" }
+ from(configurations.runtime) { into("lib/") }
+ from(configurations.archives.artifacts.files) { into("lib/") }
+ duplicatesStrategy 'exclude'
+ }
+}
+
project(':samza-tools') {
apply plugin: 'java'
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a5b4f51..43ceb0a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -47,4 +47,6 @@
zkClientVersion = "0.8"
zookeeperVersion = "3.4.6"
failsafeVersion = "1.1.0"
+ jlineVersion = "3.8.2"
+ jnaVersion = "4.5.1"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 70c9b23..215177b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -70,10 +70,8 @@ import org.slf4j.LoggerFactory;
static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
originalConfig.forEach((k, v) -> {
- if (generatedConfig.containsKey(k) &&
- !Objects.equals(generatedConfig.get(k), v)) {
- LOG.info("Replacing generated config for key: {} value: {} with original config value: {}",
- k, generatedConfig.get(k), v);
+ if (generatedConfig.containsKey(k) && !Objects.equals(generatedConfig.get(k), v)) {
+ LOG.info("Replacing generated config for key: {} value: {} with original config value: {}", k, generatedConfig.get(k), v);
}
mergedConfig.put(k, v);
});
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/conf/samza-sql-shell-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/samza-sql-shell-log4j.xml b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
new file mode 100644
index 0000000..924ef93
--- /dev/null
+++ b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern"
+ value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+ </layout>
+ </appender>
+
+ <appender name="file" class="org.apache.log4j.RollingFileAppender">
+ <param name="append" value="false" />
+ <param name="maxFileSize" value="10MB" />
+ <param name="maxBackupIndex" value="10" />
+ <param name="file" value="${LOG_HOME}/logs/samza-sql-shell.log" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern"
+ value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+ </layout>
+ </appender>
+
+ <root>
+ <level value="info" />
+ <appender-ref ref="file" />
+ </root>
+</log4j:configuration>
+
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/conf/shell-defaults.conf
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/shell-defaults.conf b/samza-sql-shell/conf/shell-defaults.conf
new file mode 100644
index 0000000..0c85e52
--- /dev/null
+++ b/samza-sql-shell/conf/shell-defaults.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Default system properties included when running samza-sql-shell.
+# This is useful for setting default environmental settings.
+
+# Example:
+shell.executor = org.apache.samza.sql.client.impl.SamzaExecutor
+samza.sql.output = compact
+# samza.sql.system.kafka.address = localhost:2181
+# samza.sql.relSchemaProvider.config.schemaDir = /tmp/schemas/
+# samza.sql.ioResolver = config
+# samza.sql.udfResolver = config
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/scripts/samza-sql-shell.sh
----------------------------------------------------------------------
diff --git a/samza-sql-shell/scripts/samza-sql-shell.sh b/samza-sql-shell/scripts/samza-sql-shell.sh
new file mode 100755
index 0000000..d9eb6df
--- /dev/null
+++ b/samza-sql-shell/scripts/samza-sql-shell.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+ base_dir=$(readlink -f $(dirname $0))
+else
+ base_dir=$(dirname $0)
+fi
+
+parent_dir="$(dirname "$base_dir")"
+
+CONF_FILE="-conf $parent_dir/conf/shell-defaults.conf"
+
+if [ "x$LOG_HOME" = "x" ]; then
+ export LOG_HOME=$parent_dir
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+ export LOG4J_OPTS="-Dlog4j.configuration=file:$parent_dir/conf/samza-sql-shell-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+ export HEAP_OPTS="-Xmx4G -Xms4G"
+fi
+
+exec $base_dir/run-class.sh $LOG4J_OPTS -DLOG_HOME=$LOG_HOME org.apache.samza.sql.client.cli.Main $CONF_FILE "$@"
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
new file mode 100755
index 0000000..7c6480b
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * A shell command containing command name and parameters.
+ */
+class CliCommand {
+ private CliCommandType commandType;
+ private String parameters;
+
+ public CliCommand(CliCommandType cmdType) {
+ this.commandType = cmdType;
+ }
+
+ public CliCommand(CliCommandType cmdType, String parameters) {
+ this(cmdType);
+ this.parameters = parameters;
+ }
+
+ public CliCommandType getCommandType() {
+ return commandType;
+ }
+
+ public String getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(String parameters) {
+ this.parameters = parameters;
+ }
+
+ public String getFullCommand() {
+ return commandType.getCommandName() + CliConstants.SPACE + parameters;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
new file mode 100755
index 0000000..0cd170e
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Enum all the commands we now support along with descriptions.
+ */
+enum CliCommandType {
+ SHOW_TABLES("SHOW TABLES", "Shows all available tables.", "Usage: SHOW TABLES <table name>"),
+ SHOW_FUNCTIONS("SHOW FUNCTIONS", "Shows all available UDFs.", "SHOW FUNCTION"),
+ DESCRIBE("DESCRIBE", "Describes a table.", "Usage: DESCRIBE <table name>"),
+
+ SELECT("SELECT", "\tExecutes a SQL SELECT query.", "SELECT uses a standard streaming SQL syntax."),
+ EXECUTE("EXECUTE", "\tExecute a sql file.", "EXECUTE <URI of a sql file>"),
+ INSERT_INTO("INSERT INTO", "Executes a SQL INSERT INTO.", "INSERT INTO uses a standard streaming SQL syntax."),
+ LS("LS", "\tLists all background executions.", "LS [execution ID]"),
+ STOP("STOP", "\tStops an execution.", "Usage: STOP <execution ID>"),
+ RM("RM", "\tRemoves an execution from the list.", "Usage: RM <execution ID>"),
+
+ HELP("HELP", "\tDisplays this help message.", "Usage: HELP [command]"),
+ SET("SET", "\tSets a variable.", "Usage: SET VAR=VAL"),
+ CLEAR("CLEAR", "\tClears the screen.", "CLEAR"),
+ EXIT("EXIT", "\tExits the shell.", "Exit"),
+ QUIT("QUIT", "\tQuits the shell.", "QUIT"),
+
+ INVALID_COMMAND("INVALID_COMMAND", "INVALID_COMMAND", "INVALID_COMMAND");
+
+ private final String cmdName;
+ private final String description;
+ private final String usage;
+
+ CliCommandType(String cmdName, String description, String usage) {
+ this.cmdName = cmdName;
+ this.description = description;
+ this.usage = usage;
+ }
+
+ public static List<String> getAllCommands() {
+ List<String> cmds = new ArrayList<String>();
+ for (CliCommandType t : CliCommandType.values()) {
+ if (t != INVALID_COMMAND)
+ cmds.add(t.getCommandName());
+ }
+ return cmds;
+ }
+
+ public String getCommandName() {
+ return cmdName;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public String getUsage() {
+ return usage;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
new file mode 100755
index 0000000..286dc79
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * Constant definitions for the shell.
+ */
+class CliConstants {
+ public static final String APP_NAME = "Samza SQL Shell";
+ public static final String WINDOW_TITLE = "Samza SQL Shell";
+ public static final String PROMPT_1ST = "Samza SQL";
+ public static final String PROMPT_1ST_END = "> ";
+
+ // All shell environment variables starts with the prefix
+ public static final String CONFIG_SHELL_PREFIX = "shell.";
+ // Specifies the executor used by the shell
+ public static final String CONFIG_EXECUTOR = "shell.executor";
+
+ public static final String VERSION = "0.0.1";
+
+
+ public static final String WELCOME_MESSAGE;
+ static {
+ WELCOME_MESSAGE =
+" ___ ___ ___ ___ ___ \n" +
+" / /\\ / /\\ / /\\ /__/\\ / /\\ \n" +
+" / /::\\ / /::\\ / /::| \\ \\:\\ / /::\\ \n"+
+" /__/:/\\:\\ / /:/\\:\\ / /:|:| \\ \\:\\ / /:/\\:\\ \n"+
+" _\\_ \\:\\ \\:\\ / /::\\ \\:\\ / /:/|:|__ \\ \\:\\ / /::\\ \\:\\ \n"+
+" /__/\\ \\:\\ \\:\\ /__/:/\\:\\_\\:\\ /__/:/_|::::\\ ______\\__\\:\\ /__/:/\\:\\_\\:\\ \n"+
+" \\ \\:\\ \\:\\_\\/ \\__\\/ \\:\\/:/ \\__\\/ /~~/:/ \\ \\::::::::/ \\__\\/ \\:\\/:/ \n"+
+" \\ \\:\\_\\:\\ \\__\\::/ / /:/ \\ \\:\\~~~~~ \\__\\::/ \n"+
+" \\ \\:\\/:/ / /:/ / /:/ \\ \\:\\ / /:/ \n"+
+" \\ \\::/ /__/:/ /__/:/ \\ \\:\\ /__/:/ \n"+
+" \\__\\/ \\__\\/ \\__\\/ \\__\\/ \\__\\/ \n\n"+
+"Welcome to Samza SQL shell (V" + VERSION + "). Enter HELP for all commands.\n\n";
+ }
+
+ public static final char SPACE = '\u0020';
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
new file mode 100644
index 0000000..9384169
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CliEnvironment contains "environment variables" that configures the shell behavior.
+ */
+public class CliEnvironment {
+ private static final String debugEnvVar = "shell.debug";
+ private static PrintStream stdout = System.out;
+ private static PrintStream stderr = System.err;
+ private Boolean debug = false;
+
+ boolean isDebug() {
+ return debug;
+ }
+
+ void setDebug(Boolean debug) {
+ this.debug = debug;
+ }
+
+ /**
+ * @param var Environment variable
+ * @param val Value of the environment variable
+ * @return 0 : succeed
+ * -1: invalid var
+ * -2: invalid val
+ */
+ int setEnvironmentVariable(String var, String val) {
+ switch (var.toLowerCase()) {
+ case debugEnvVar:
+ val = val.toLowerCase();
+ if (val.equals("true")) {
+ debug = true;
+ enableJavaSystemOutAndErr();
+ } else if (val.equals("false")) {
+ debug = false;
+ disableJavaSystemOutAndErr();
+ } else
+ return -2;
+ break;
+ default:
+ return -1;
+ }
+
+ return 0;
+ }
+
+ // TODO: Separate the values out of the logic part
+ List<String> getPossibleValues(String var) {
+ List<String> vals = new ArrayList<>();
+ switch (var.toLowerCase()) {
+ case debugEnvVar:
+ vals.add("true");
+ vals.add("false");
+ return vals;
+ default:
+ return null;
+ }
+ }
+
+ void printAll(Writer writer) throws IOException {
+ writer.write(debugEnvVar);
+ writer.write('=');
+ writer.write(debug.toString());
+ writer.write('\n');
+ }
+
+ private void disableJavaSystemOutAndErr() {
+ PrintStream ps = new PrintStream(new NullOutputStream());
+ System.setOut(ps);
+ System.setErr(ps);
+ }
+
+ private void enableJavaSystemOutAndErr() {
+ System.setOut(stdout);
+ System.setErr(stderr);
+ }
+
+ void takeEffect() {
+ if (debug) {
+ enableJavaSystemOutAndErr();
+ } else {
+ // We control terminal directly; Forbid any Java System.out and System.err stuff so
+ // any underlying output will not mess up the console
+ disableJavaSystemOutAndErr();
+ }
+ }
+
+ private class NullOutputStream extends OutputStream {
+ public void close() {
+ }
+
+ public void flush() {
+ }
+
+ public void write(byte[] b) {
+ }
+
+ public void write(byte[] b, int off, int len) {
+ }
+
+ public void write(int b) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
new file mode 100755
index 0000000..c01ed5c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.Highlighter;
+import org.jline.reader.LineReader;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A primitive highlighter.
+ */
+public class CliHighlighter implements Highlighter {
+ private static final List<String> keywords;
+
+ static {
+ keywords = CliCommandType.getAllCommands();
+ keywords.add("FROM");
+ keywords.add("WHERE");
+ }
+
+ private static List<String> splitWithSpace(String buffer) {
+ List<String> list = new ArrayList<String>();
+ if (CliUtil.isNullOrEmpty(buffer))
+ return list;
+
+ boolean prevIsSpace = Character.isSpaceChar(buffer.charAt(0));
+ int prevPos = 0;
+ for (int i = 1; i < buffer.length(); ++i) {
+ char c = buffer.charAt(i);
+ boolean isSpace = Character.isSpaceChar(c);
+ if (isSpace != prevIsSpace) {
+ list.add(buffer.substring(prevPos, i));
+ prevPos = i;
+ prevIsSpace = isSpace;
+ }
+ }
+ list.add(buffer.substring(prevPos));
+ return list;
+ }
+
+ public AttributedString highlight(LineReader reader, String buffer) {
+ AttributedStringBuilder builder = new AttributedStringBuilder();
+ List<String> tokens = splitWithSpace(buffer);
+
+ for (String token : tokens) {
+ if (isKeyword(token)) {
+ builder.style(AttributedStyle.BOLD.foreground(AttributedStyle.YELLOW))
+ .append(token);
+ } else {
+ builder.style(AttributedStyle.DEFAULT)
+ .append(token);
+ }
+ }
+
+ return builder.toAttributedString();
+ }
+
+ private boolean isKeyword(String token) {
+ for (String keyword : keywords) {
+ if (keyword.compareToIgnoreCase(token) == 0)
+ return true;
+ }
+ return false;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
new file mode 100755
index 0000000..54c7bf6
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.completer.StringsCompleter;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+
+/**
+ * The shell UI.
+ */
+class CliShell {
+ private final Terminal terminal;
+ private final PrintWriter writer;
+ private final LineReader lineReader;
+ private final String firstPrompt;
+ private final SqlExecutor executor;
+ private final ExecutionContext exeContext;
+ private CliEnvironment env;
+ private boolean keepRunning = true;
+ private Map<Integer, String> executions = new TreeMap<>();
+
+ public CliShell(SqlExecutor executor, CliEnvironment environment, ExecutionContext execContext) {
+ if (executor == null || environment == null || execContext == null) {
+ throw new IllegalArgumentException();
+ }
+
+ // Terminal
+ try {
+ terminal = TerminalBuilder.builder()
+ .name(CliConstants.WINDOW_TITLE)
+ .build();
+ } catch (IOException e) {
+ throw new CliException("Error when creating terminal", e);
+ }
+
+ // Terminal writer
+ writer = terminal.writer();
+
+ // LineReader
+ final DefaultParser parser = new DefaultParser()
+ .eofOnEscapedNewLine(true)
+ .eofOnUnclosedQuote(true);
+ lineReader = LineReaderBuilder.builder()
+ .appName(CliConstants.APP_NAME)
+ .terminal(terminal)
+ .parser(parser)
+ .highlighter(new CliHighlighter())
+ .completer(new StringsCompleter(CliCommandType.getAllCommands()))
+ .build();
+
+ // Command Prompt
+ firstPrompt = new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
+ .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
+ .toAnsi();
+
+ // Execution context and executor
+ env = environment;
+ env.takeEffect();
+ exeContext = execContext;
+ this.executor = executor;
+ this.executor.start(exeContext);
+ }
+
+ Terminal getTerminal() {
+ return terminal;
+ }
+
+ CliEnvironment getEnvironment() {
+ return env;
+ }
+
+ SqlExecutor getExecutor() {
+ return executor;
+ }
+
+ ExecutionContext getExeContext() {
+ return exeContext;
+ }
+
+ /**
+ * Actually run the shell. Does not return until user choose to exit.
+ */
+ public void open() {
+ // Remember we cannot enter alternate screen mode here as there is only one alternate
+ // screen and we need it to show streaming results. Clear the screen instead.
+ clearScreen();
+ writer.write(CliConstants.WELCOME_MESSAGE);
+
+ try {
+ // Check if jna.jar exists in class path
+ try {
+ ClassLoader.getSystemClassLoader().loadClass("com.sun.jna.NativeLibrary");
+ } catch (ClassNotFoundException e) {
+ // Something's wrong. It could be a dumb terminal if neither jna nor jansi lib is there
+ writer.write("Warning: jna.jar does NOT exist. It may lead to a dumb shell or a performance hit.\n");
+ }
+
+ while (keepRunning) {
+ String line;
+ try {
+ line = lineReader.readLine(firstPrompt);
+ } catch (UserInterruptException e) {
+ continue;
+ } catch (EndOfFileException e) {
+ commandQuit();
+ break;
+ }
+
+ if (!CliUtil.isNullOrEmpty(line)) {
+ CliCommand command = parseLine(line);
+ if (command == null)
+ continue;
+
+ switch (command.getCommandType()) {
+ case CLEAR:
+ commandClear();
+ break;
+
+ case DESCRIBE:
+ commandDescribe(command);
+ break;
+
+ case EXECUTE:
+ commandExecuteFile(command);
+ break;
+
+ case EXIT:
+ case QUIT:
+ commandQuit();
+ break;
+
+ case HELP:
+ commandHelp(command);
+ break;
+
+ case INSERT_INTO:
+ commandInsertInto(command);
+ break;
+
+ case LS:
+ commandLs(command);
+ break;
+
+ case RM:
+ commandRm(command);
+ break;
+
+ case SELECT:
+ commandSelect(command);
+ break;
+
+ case SET:
+ commandSet(command);
+ break;
+
+ case SHOW_FUNCTIONS:
+ commandShowFunctions(command);
+ break;
+
+ case SHOW_TABLES:
+ commandShowTables(command);
+ break;
+
+ case STOP:
+ commandStop(command);
+ break;
+
+ case INVALID_COMMAND:
+ printHelpMessage();
+ break;
+
+ default:
+ writer.write("UNDER DEVELOPEMENT. Command:" + command.getCommandType() + "\n");
+ writer.write("Parameters:" +
+ (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" : command.getParameters())
+ + "\n\n");
+ writer.flush();
+ }
+ }
+ }
+ } catch (Exception e) {
+ writer.print(e.getClass().getSimpleName());
+ writer.print(". ");
+ writer.println(e.getMessage());
+ e.printStackTrace(writer);
+ writer.println();
+ writer.println("We are sorry but SamzaSqlShell has encountered a problem and needs to stop.");
+ }
+
+ writer.write("Cleaning up... ");
+ writer.flush();
+ executor.stop(exeContext);
+
+ writer.write("Done.\nBye.\n\n");
+ writer.flush();
+
+ try {
+ terminal.close();
+ } catch (IOException e) {
+ // Doesn't matter
+ }
+ }
+
+ private void commandClear() {
+ clearScreen();
+ }
+
+ private void commandDescribe(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ SqlSchema schema = executor.getTableSchema(exeContext, parameters);
+
+ if (schema == null) {
+ writer.println("Failed to get schema. Error: " + executor.getErrorMsg());
+ } else {
+ writer.println();
+ List<String> lines = formatSchema4Display(schema);
+ for (String line : lines) {
+ writer.println(line);
+ }
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandSet(CliCommand command) {
+ String param = command.getParameters();
+ if (CliUtil.isNullOrEmpty(param)) {
+ try {
+ env.printAll(writer);
+ } catch (IOException e) {
+ e.printStackTrace(writer);
+ }
+ writer.println();
+ writer.flush();
+ return;
+ }
+ String[] params = param.split("=");
+ if (params.length != 2) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ int ret = env.setEnvironmentVariable(params[0], params[1]);
+ if (ret == 0) {
+ writer.print(params[0]);
+ writer.print(" set to ");
+ writer.println(params[1]);
+ } else if (ret == -1) {
+ writer.print("Unknow variable: ");
+ writer.println(params[0]);
+ } else if (ret == -2) {
+ writer.print("Invalid value: ");
+ writer.println(params[1]);
+ List<String> vals = env.getPossibleValues(params[0]);
+ writer.print("Possible values:");
+ for (String s : vals) {
+ writer.print(CliConstants.SPACE);
+ writer.print(s);
+ }
+ writer.println();
+ }
+
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandExecuteFile(CliCommand command) {
+ String fullCmdStr = command.getFullCommand();
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println("Usage: execute <fileuri>\n");
+ writer.flush();
+ return;
+ }
+ URI uri = null;
+ boolean valid = false;
+ File file = null;
+ try {
+ uri = new URI(parameters);
+ file = new File(uri.getPath());
+ valid = file.exists() && !file.isDirectory();
+ } catch (URISyntaxException e) {
+ }
+ if (!valid) {
+ writer.println("Invalid URI.\n");
+ writer.flush();
+ return;
+ }
+
+ NonQueryResult nonQueryResult = executor.executeNonQuery(exeContext, file);
+ if (!nonQueryResult.succeeded()) {
+ writer.print("Execution error: ");
+ writer.println(executor.getErrorMsg());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ executions.put(nonQueryResult.getExecutionId(), fullCmdStr);
+ List<String> submittedStmts = nonQueryResult.getSubmittedStmts();
+ List<String> nonsubmittedStmts = nonQueryResult.getNonSubmittedStmts();
+
+ writer.println("Sql file submitted. Execution ID: " + nonQueryResult.getExecutionId());
+ writer.println("Submitted statements: \n");
+ if (submittedStmts == null || submittedStmts.size() == 0) {
+ writer.println("\tNone.");
+ } else {
+ for (String statement : submittedStmts) {
+ writer.print("\t");
+ writer.println(statement);
+ }
+ writer.println();
+ }
+
+ if (nonsubmittedStmts != null && nonsubmittedStmts.size() != 0) {
+ writer.println("Statements NOT submitted: \n");
+ for (String statement : nonsubmittedStmts) {
+ writer.print("\t");
+ writer.println(statement);
+ }
+ writer.println();
+ }
+
+ writer.println("Note: All query statements in a sql file are NOT submitted.");
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandInsertInto(CliCommand command) {
+ String fullCmdStr = command.getFullCommand();
+ NonQueryResult result = executor.executeNonQuery(exeContext,
+ Collections.singletonList(fullCmdStr));
+
+ if (result.succeeded()) {
+ writer.print("Execution submitted successfully. Id: ");
+ writer.println(String.valueOf(result.getExecutionId()));
+ executions.put(result.getExecutionId(), fullCmdStr);
+ } else {
+ writer.write("Execution failed to submit. Error: ");
+ writer.println(executor.getErrorMsg());
+ }
+
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandLs(CliCommand command) {
+ List<Integer> execIds = new ArrayList<>();
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ execIds.addAll(executions.keySet());
+ } else {
+ String[] params = parameters.split("\u0020");
+ for (String param : params) {
+ Integer id = null;
+ try {
+ id = Integer.valueOf(param);
+ } catch (NumberFormatException e) {
+ }
+ if (id != null && executions.containsKey(id)) {
+ execIds.add(id);
+ }
+ }
+ }
+ if (execIds.size() == 0) {
+ writer.println();
+ return;
+ }
+
+ execIds.sort(Integer::compareTo);
+
+ final int terminalWidth = terminal.getWidth();
+ final int ID_WIDTH = 3;
+ final int STATUS_WIDTH = 20;
+ final int CMD_WIDTH = terminalWidth - ID_WIDTH - STATUS_WIDTH - 4;
+
+ AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+ AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+ for (int i = 0; i < execIds.size(); ++i) {
+ Integer id = execIds.get(i);
+ String cmd = executions.get(id);
+ if (cmd == null)
+ continue;
+
+ String status = "UNKNOWN";
+ try {
+ ExecutionStatus execStatus = executor.queryExecutionStatus(id);
+ if (execStatus != null)
+ status = execStatus.name();
+ } catch (ExecutionException e) {
+ }
+
+ int cmdStartIdx = 0;
+ int cmdLength = cmd.length();
+ StringBuilder line;
+ while (cmdStartIdx < cmdLength) {
+ line = new StringBuilder(terminalWidth);
+ if (cmdStartIdx == 0) {
+ line.append(CliConstants.SPACE);
+ line.append(id);
+ CliUtil.appendTo(line, 1 + ID_WIDTH + 1, CliConstants.SPACE);
+ line.append(status);
+ }
+ CliUtil.appendTo(line, 1 + ID_WIDTH + 1 + STATUS_WIDTH + 1, CliConstants.SPACE);
+
+ int numToWrite = Math.min(CMD_WIDTH, cmdLength - cmdStartIdx);
+ if (numToWrite > 0) {
+ line.append(cmd, cmdStartIdx, cmdStartIdx + numToWrite);
+ cmdStartIdx += numToWrite;
+ }
+
+ if (i % 2 == 0) {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+ attrBuilder.append(line.toString());
+ writer.println(attrBuilder.toAnsi());
+ } else {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+ attrBuilder.append(line.toString());
+ writer.println(attrBuilder.toAnsi());
+ }
+ }
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandRm(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ List<Integer> execIds = new ArrayList<>();
+ String[] params = parameters.split("\u0020");
+ for (String param : params) {
+ Integer id = null;
+ try {
+ id = Integer.valueOf(param);
+ } catch (NumberFormatException e) {
+ }
+ if (id == null || !executions.containsKey(id)) {
+ writer.print("Error: ");
+ writer.print(param);
+ writer.println(" is not a valid id.");
+ } else {
+ execIds.add(id);
+ }
+ }
+
+ for (Integer id : execIds) {
+ ExecutionStatus status = null;
+ try {
+ status = executor.queryExecutionStatus(id);
+ } catch (ExecutionException e) {
+ }
+ if (status == null) {
+ writer.println(String.format("Error: failed to get execution status for %d. %s",
+ id, executor.getErrorMsg()));
+ continue;
+ }
+ if (status == ExecutionStatus.Running) {
+ writer.println(String.format("Execution %d is still running. Stop it first.", id));
+ continue;
+ }
+ if (executor.removeExecution(exeContext, id)) {
+ writer.println(String.format("Execution %d was removed.", id));
+ executions.remove(id);
+ } else {
+ writer.println(String.format("Error: failed to remove execution %d. %s",
+ id, executor.getErrorMsg()));
+ }
+
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandQuit() {
+ keepRunning = false;
+ }
+
+ private void commandSelect(CliCommand command) {
+ QueryResult queryResult = executor.executeQuery(exeContext, command.getFullCommand());
+
+ if (queryResult.succeeded()) {
+ CliView view = new QueryResultLogView();
+ view.open(this, queryResult);
+ executor.stopExecution(exeContext, queryResult.getExecutionId());
+ } else {
+ writer.write("Execution failed. Error: ");
+ writer.println(executor.getErrorMsg());
+ writer.println();
+ writer.flush();
+ }
+ }
+
+ private void commandShowTables(CliCommand command) {
+ List<String> tableNames = executor.listTables(exeContext);
+
+ if (tableNames != null) {
+ for (String tableName : tableNames) {
+ writer.println(tableName);
+ }
+ } else {
+ writer.print("Failed to list tables. Error: ");
+ writer.println(executor.getErrorMsg());
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandShowFunctions(CliCommand command) {
+ List<SqlFunction> fns = executor.listFunctions(exeContext);
+
+ if (fns != null) {
+ for (SqlFunction fn : fns) {
+ writer.println(fn.toString());
+ }
+ } else {
+ writer.print("Failed to list functions. Error: ");
+ writer.println(executor.getErrorMsg());
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandStop(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ List<Integer> execIds = new ArrayList<>();
+ String[] params = parameters.split("\u0020");
+ for (String param : params) {
+ Integer id = null;
+ try {
+ id = Integer.valueOf(param);
+ } catch (NumberFormatException e) {
+ }
+ if (id == null || !executions.containsKey(id)) {
+ writer.print("Error: ");
+ writer.print(param);
+ writer.println(" is not a valid id.");
+ } else {
+ execIds.add(id);
+ }
+ }
+
+ for (Integer id : execIds) {
+ if (executor.stopExecution(exeContext, id)) {
+ writer.println(String.format("Request to stop execution %d was sent.", id));
+ } else {
+ writer.println(String.format("Failed to stop %d: %s", id, executor.getErrorMsg()));
+ }
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandHelp(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ printHelpMessage();
+ return;
+ }
+
+ parameters = parameters.trim().toUpperCase();
+ for (CliCommandType cmdType : CliCommandType.values()) {
+ String cmdText = cmdType.getCommandName();
+ if (cmdText.equals(parameters)) {
+ writer.println(cmdType.getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+ }
+
+ writer.print("Unknown command: ");
+ writer.println(parameters);
+ writer.println();
+ writer.flush();
+ }
+
+
+ private CliCommand parseLine(String line) {
+ line = trimCommand(line);
+ if (CliUtil.isNullOrEmpty(line))
+ return null;
+
+ String upperCaseLine = line.toUpperCase();
+ for (CliCommandType cmdType : CliCommandType.values()) {
+ String cmdText = cmdType.getCommandName();
+ if (upperCaseLine.startsWith(cmdText)) {
+ if (upperCaseLine.length() == cmdText.length())
+ return new CliCommand(cmdType);
+ else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) {
+ String parameter = line.substring(cmdText.length()).trim();
+ if (!parameter.isEmpty())
+ return new CliCommand(cmdType, parameter);
+ }
+ }
+ }
+ return new CliCommand(CliCommandType.INVALID_COMMAND);
+ }
+
+ private void printHelpMessage() {
+ writer.println();
+ AttributedStringBuilder builder = new AttributedStringBuilder();
+ builder.append("The following commands are supported by ")
+ .append(CliConstants.APP_NAME)
+ .append(" at the moment.\n\n");
+
+ for (CliCommandType cmdType : CliCommandType.values()) {
+ if (cmdType == CliCommandType.INVALID_COMMAND)
+ continue;
+
+ String cmdText = cmdType.getCommandName();
+ String cmdDescription = cmdType.getDescription();
+
+ builder.style(AttributedStyle.DEFAULT.bold())
+ .append(cmdText)
+ .append("\t\t")
+ .style(AttributedStyle.DEFAULT)
+ .append(cmdDescription)
+ .append("\n");
+ }
+
+ writer.println(builder.toAnsi());
+ writer.println("HELP <COMMAND> to get help for a specific command.\n");
+ writer.flush();
+ }
+
+ private void clearScreen() {
+ terminal.puts(InfoCmp.Capability.clear_screen);
+ }
+
+ /*
+ Field | Type
+ -------------------------
+ Field1 | Type 1
+ Field2 | VARCHAR(STRING)
+ Field... | VARCHAR(STRING)
+ -------------------------
+ */
+ private List<String> formatSchema4Display(SqlSchema schema) {
+ final String HEADER_FIELD = "Field";
+ final String HEADER_TYPE = "Type";
+ final char SEPERATOR = '|';
+ final char LINE_SEP = '-';
+
+ int terminalWidth = terminal.getWidth();
+ // Two spaces * 2 plus one SEPERATOR
+ if (terminalWidth < 2 + 2 + 1 + HEADER_FIELD.length() + HEADER_TYPE.length()) {
+ return Collections.singletonList("Not enough room.");
+ }
+
+ // Find the best seperator position for least rows
+ int seperatorPos = HEADER_FIELD.length() + 2;
+ int minRowNeeded = Integer.MAX_VALUE;
+ int longestLineCharNum = 0;
+ int rowCount = schema.getFieldCount();
+ for (int j = seperatorPos; j < terminalWidth - HEADER_TYPE.length() - 2; ++j) {
+ boolean fieldWrapped = false;
+ int rowNeeded = 0;
+ for (int i = 0; i < rowCount; ++i) {
+ int fieldLen = schema.getFieldName(i).length();
+ int typeLen = schema.getFieldTypeName(i).length();
+ int fieldRowNeeded = CliUtil.ceilingDiv(fieldLen, j - 2);
+ int typeRowNeeded = CliUtil.ceilingDiv(typeLen, terminalWidth - 1 - j - 2);
+
+ rowNeeded += Math.max(fieldRowNeeded, typeRowNeeded);
+ fieldWrapped |= fieldRowNeeded > 1;
+ if (typeRowNeeded > 1) {
+ longestLineCharNum = terminalWidth;
+ } else {
+ longestLineCharNum = Math.max(longestLineCharNum, j + typeLen + 2 + 1);
+ }
+ }
+ if (rowNeeded < minRowNeeded) {
+ minRowNeeded = rowNeeded;
+ seperatorPos = j;
+ }
+ if (!fieldWrapped)
+ break;
+ }
+
+ List<String> lines = new ArrayList<>(minRowNeeded + 4);
+
+ // Header
+ StringBuilder line = new StringBuilder(terminalWidth);
+ line.append(CliConstants.SPACE);
+ line.append(HEADER_FIELD);
+ CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+ line.append(SEPERATOR);
+ line.append(CliConstants.SPACE);
+ line.append(HEADER_TYPE);
+ lines.add(line.toString());
+ line = new StringBuilder(terminalWidth);
+ CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+ lines.add(line.toString());
+
+ // Body
+ AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+ AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+
+ final int fieldColSize = seperatorPos - 2;
+ final int typeColSize = terminalWidth - seperatorPos - 1 - 2;
+ for (int i = 0; i < rowCount; ++i) {
+ String field = schema.getFieldName(i);
+ String type = schema.getFieldTypeName(i);
+ int fieldLen = field.length();
+ int typeLen = type.length();
+ int fieldStartIdx = 0, typeStartIdx = 0;
+ while (fieldStartIdx < fieldLen || typeStartIdx < typeLen) {
+ line = new StringBuilder(terminalWidth);
+ line.append(CliConstants.SPACE);
+ int numToWrite = Math.min(fieldColSize, fieldLen - fieldStartIdx);
+ if (numToWrite > 0) {
+ line.append(field, fieldStartIdx, fieldStartIdx + numToWrite);
+ fieldStartIdx += numToWrite;
+ }
+ CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+ line.append(SEPERATOR);
+ line.append(CliConstants.SPACE);
+
+ numToWrite = Math.min(typeColSize, typeLen - typeStartIdx);
+ if (numToWrite > 0) {
+ line.append(type, typeStartIdx, typeStartIdx + numToWrite);
+ typeStartIdx += numToWrite;
+ }
+
+ if (i % 2 == 0) {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+ attrBuilder.append(line.toString());
+ lines.add(attrBuilder.toAnsi());
+ } else {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+ attrBuilder.append(line.toString());
+ lines.add(attrBuilder.toAnsi());
+ }
+ }
+ }
+
+ // Footer
+ line = new StringBuilder(terminalWidth);
+ CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+ lines.add(line.toString());
+ return lines;
+ }
+
+ // Trims: leading spaces; trailing spaces and ";"s
+ private String trimCommand(String command) {
+ if (CliUtil.isNullOrEmpty(command))
+ return command;
+
+ int len = command.length();
+ int st = 0;
+
+ while ((st < len) && (command.charAt(st) <= ' ')) {
+ st++;
+ }
+ while ((st < len) && ((command.charAt(len - 1) <= ' ')
+ || command.charAt(len - 1) == ';')) {
+ len--;
+ }
+ return ((st > 0) || (len < command.length())) ? command.substring(st, len) : command;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
new file mode 100644
index 0000000..de82100
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.QueryResult;
+
+/**
+ * For displaying the streaming result of a SELECT statement.
+ */
+public interface CliView {
+ public void open(CliShell shell, QueryResult queryResult);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
new file mode 100755
index 0000000..d85cdae
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.sql.client.impl.SamzaExecutor;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry of the program.
+ */
+public class Main {
+ private static final Logger LOG = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) {
+ // Get configuration file path
+ String configFilePath = null;
+ for(int i = 0; i < args.length; ++i) {
+ switch(args[i]) {
+ case "-conf":
+ if(i + 1 < args.length) {
+ configFilePath = args[i + 1];
+ i++;
+ }
+ break;
+ default:
+ LOG.warn("Unknown parameter {}", args[i]);
+ break;
+ }
+ }
+
+ SqlExecutor executor = null;
+ CliEnvironment environment = new CliEnvironment();
+ Map<String, String> executorConfig = new HashMap<>();
+
+ if(!CliUtil.isNullOrEmpty(configFilePath)) {
+ LOG.info("Configuration file path is: {}", configFilePath);
+ try {
+ FileReader fileReader = new FileReader(configFilePath);
+ BufferedReader bufferedReader = new BufferedReader(fileReader);
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ if (line.startsWith("#") || line.startsWith("[")) {
+ continue;
+ }
+ String[] strs = line.split("=");
+ if (strs.length != 2) {
+ continue;
+ }
+ String key = strs[0].trim().toLowerCase();
+ String value = strs[1].trim();
+ if(key.startsWith(CliConstants.CONFIG_SHELL_PREFIX)) {
+ if(key.equals(CliConstants.CONFIG_EXECUTOR)) {
+ try {
+ Class<?> clazz = Class.forName(value);
+ Constructor<?> ctor = clazz.getConstructor();
+ executor = (SqlExecutor) ctor.newInstance();
+ LOG.info("Sql executor creation succeed. Executor class is: {}", value);
+ } catch (ClassNotFoundException | NoSuchMethodException
+ | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ throw new CliException(String.format("Failed to create executor %s.", value), e);
+ }
+ continue;
+ }
+
+ // Suppose a shell variable.
+ int result = environment.setEnvironmentVariable(key, value);
+ if(result == -1) { // CliEnvironment doesn't recognize the key.
+ LOG.warn("Unknowing shell environment variable: {}", key);
+ } else if(result == -2) { // Invalid value
+ LOG.warn("Unknowing shell environment value: {}", value);
+ }
+ } else {
+ executorConfig.put(key, value);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Error in opening and reading the configuration file {}", e.toString());
+ }
+ }
+ if(executor == null) {
+ executor = new SamzaExecutor();
+ }
+
+ CliShell shell = new CliShell(executor, environment, new ExecutionContext(executorConfig));
+ shell.open();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
new file mode 100644
index 0000000..b1973bc
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.QueryResult;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.jline.keymap.BindingReader;
+import org.jline.keymap.KeyMap;
+import org.jline.terminal.Attributes;
+import org.jline.terminal.Terminal;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.jline.keymap.KeyMap.ctrl;
+
+
+/**
+ * A scrolling (logging) view of the query result of a streaming SELECT statement.
+ */
+
+public class QueryResultLogView implements CliView {
+ private static final int DEFAULT_REFRESH_INTERVAL = 100; // all intervals are in ms
+
+ private int refreshInterval = DEFAULT_REFRESH_INTERVAL;
+ private int height;
+ private Terminal terminal;
+ private SqlExecutor executor;
+ private ExecutionContext exeContext;
+ private volatile boolean keepRunning = true;
+ private boolean paused = false;
+
+ // Stupid BindingReader doesn't have a real nonblocking mode
+ // Must create a new thread to get user input
+ private Thread inputThread;
+ private BindingReader keyReader;
+
+ public QueryResultLogView() {
+ }
+
+ // -- implementation of CliView -------------------------------------------
+
+ public void open(CliShell shell, QueryResult queryResult) {
+ terminal = shell.getTerminal();
+ executor = shell.getExecutor();
+ exeContext = shell.getExeContext();
+
+ TerminalStatus prevStatus = setupTerminal();
+ try {
+ keyReader = new BindingReader(terminal.reader());
+ inputThread = new InputThread();
+ inputThread.start();
+ while (keepRunning) {
+ try {
+ display();
+ if (keepRunning)
+ Thread.sleep(refreshInterval);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+
+ try {
+ inputThread.join(1 * 1000);
+ } catch (InterruptedException e) {
+ }
+ } finally {
+ restoreTerminal(prevStatus);
+ }
+ if (inputThread.isAlive()) {
+ terminal.writer().println("Warning: input thread hang. Have to kill!");
+ terminal.writer().flush();
+ inputThread.interrupt();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void display() {
+ updateTerminalSize();
+ int rowsInBuffer = executor.getRowCount();
+ if (rowsInBuffer <= 0 || paused) {
+ clearStatusBar();
+ drawStatusBar(rowsInBuffer);
+ return;
+ }
+
+ while (rowsInBuffer > 0) {
+ clearStatusBar();
+ int step = 10;
+ List<String[]> lines = executor.consumeQueryResult(exeContext, 0, step - 1);
+ for (String[] line : lines) {
+ for (int i = 0; i < line.length; ++i) {
+ terminal.writer().write(line[i] == null ? "null" : line[i]);
+ terminal.writer().write(i == line.length - 1 ? "\n" : " ");
+ }
+ }
+ terminal.flush();
+ clearStatusBar();
+ drawStatusBar(rowsInBuffer);
+
+ if (!keepRunning || paused)
+ return;
+
+ rowsInBuffer = executor.getRowCount();
+ }
+ }
+
+ private void clearStatusBar() {
+ terminal.puts(InfoCmp.Capability.save_cursor);
+ terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+ terminal.puts(InfoCmp.Capability.delete_line, height - 1, 0);
+ terminal.puts(InfoCmp.Capability.restore_cursor);
+ }
+
+ private void drawStatusBar(int rowsInBuffer) {
+ terminal.puts(InfoCmp.Capability.save_cursor);
+ terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+ AttributedStyle statusBarStyle = AttributedStyle.DEFAULT.background(AttributedStyle.WHITE)
+ .foreground(AttributedStyle.BLACK);
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder()
+ .style(statusBarStyle.bold().italic())
+ .append("Q")
+ .style(statusBarStyle)
+ .append(": Quit ")
+ .style(statusBarStyle.bold().italic())
+ .append("SPACE")
+ .style(statusBarStyle)
+ .append(": Pause/Resume ")
+ .append(String.valueOf(rowsInBuffer) + " rows in buffer ");
+ if (paused) {
+ attrBuilder.style(statusBarStyle.bold().foreground(AttributedStyle.RED).blink())
+ .append("PAUSED");
+ }
+ String statusBarText = attrBuilder.toAnsi();
+ terminal.writer().print(statusBarText);
+ terminal.flush();
+ terminal.puts(InfoCmp.Capability.restore_cursor);
+ }
+
+ private TerminalStatus setupTerminal() {
+ TerminalStatus prevStatus = new TerminalStatus();
+
+ // Signal handlers
+ prevStatus.handler_INT = terminal.handle(Terminal.Signal.INT, this::handleSignal);
+ prevStatus.handler_QUIT = terminal.handle(Terminal.Signal.QUIT, this::handleSignal);
+ prevStatus.handler_TSTP = terminal.handle(Terminal.Signal.TSTP, this::handleSignal);
+ prevStatus.handler_CONT = terminal.handle(Terminal.Signal.CONT, this::handleSignal);
+ prevStatus.handler_WINCH = terminal.handle(Terminal.Signal.WINCH, this::handleSignal);
+
+ // Attributes
+ prevStatus.attributes = terminal.getAttributes();
+ Attributes newAttributes = new Attributes(prevStatus.attributes);
+ // (003, ETX, Ctrl-C, or also 0177, DEL, rubout) Interrupt char‐
+ // acter (INTR). Send a SIGINT signal. Recognized when ISIG is
+ // set, and then not passed as input.
+ newAttributes.setControlChar(Attributes.ControlChar.VINTR, 0);
+ // (034, FS, Ctrl-\) Quit character (QUIT). Send SIGQUIT signal.
+ // Recognized when ISIG is set, and then not passed as input.
+ // newAttributes.setControlChar(Attributes.ControlChar.VQUIT, 0);
+ newAttributes.setControlChar(Attributes.ControlChar.VMIN, 1);
+ newAttributes.setControlChar(Attributes.ControlChar.VTIME, 0);
+ // Enables signals and SIGTTOU signal to the process group of a background
+ // process which tries to write to our terminal
+ newAttributes.setLocalFlags(
+ EnumSet.of(Attributes.LocalFlag.ISIG, Attributes.LocalFlag.TOSTOP), true);
+ // No canonical mode, no echo, and no implementation-defined input processing
+ newAttributes.setLocalFlags(EnumSet.of(
+ Attributes.LocalFlag.ICANON, Attributes.LocalFlag.ECHO,
+ Attributes.LocalFlag.IEXTEN), false);
+ // Input flags
+ newAttributes.setInputFlags(EnumSet.of(
+ Attributes.InputFlag.ICRNL, Attributes.InputFlag.INLCR, Attributes.InputFlag.IXON), false);
+ terminal.setAttributes(newAttributes);
+
+ // Capabilities
+ // tput smcup; use alternate screen
+ terminal.puts(InfoCmp.Capability.enter_ca_mode);
+ terminal.puts(InfoCmp.Capability.cursor_invisible);
+ terminal.puts(InfoCmp.Capability.cursor_home);
+
+ terminal.flush();
+
+ return prevStatus;
+ }
+
+ private void restoreTerminal(TerminalStatus status) {
+ // Signal handlers
+ terminal.handle(Terminal.Signal.INT, status.handler_INT);
+ terminal.handle(Terminal.Signal.QUIT, status.handler_QUIT);
+ terminal.handle(Terminal.Signal.TSTP, status.handler_TSTP);
+ terminal.handle(Terminal.Signal.CONT, status.handler_CONT);
+ terminal.handle(Terminal.Signal.WINCH, status.handler_WINCH);
+
+ // Attributes
+ terminal.setAttributes(status.attributes);
+
+ // Capability
+ terminal.puts(InfoCmp.Capability.exit_ca_mode);
+ terminal.puts(InfoCmp.Capability.cursor_visible);
+ }
+
+ private void handleSignal(Terminal.Signal signal) {
+ switch (signal) {
+ case INT:
+ case QUIT:
+ keepRunning = false;
+ break;
+ case TSTP:
+ paused = true;
+ break;
+ case CONT:
+ paused = false;
+ break;
+ case WINCH:
+ updateTerminalSize();
+ break;
+ }
+ }
+
+ private void updateTerminalSize() {
+ terminal.flush();
+ height = terminal.getHeight();
+ }
+
+ private KeyMap<Action> bindActionKey() {
+ KeyMap<Action> keyMap = new KeyMap<>();
+ keyMap.bind(Action.QUIT, "Q", "q", ctrl('c'));
+ keyMap.bind(Action.SPACE, " ");
+
+ return keyMap;
+ }
+
+ public enum Action {
+ QUIT,
+ SPACE
+ }
+
+ private static class TerminalStatus {
+ Terminal.SignalHandler handler_INT;
+ Terminal.SignalHandler handler_QUIT;
+ Terminal.SignalHandler handler_TSTP;
+ Terminal.SignalHandler handler_CONT;
+ Terminal.SignalHandler handler_WINCH;
+
+ Attributes attributes;
+ }
+
+ private class InputThread extends Thread {
+ public InputThread() {
+ }
+
+ public void run() {
+ KeyMap<Action> keyMap = bindActionKey();
+
+ Action action = keyReader.readBinding(keyMap, null, true);
+ while (action != null && keepRunning) {
+ switch (action) {
+ case QUIT:
+ keepRunning = false;
+ return;
+ case SPACE:
+ paused = !paused;
+ break;
+ }
+ action = keyReader.readBinding(keyMap, null, true);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
new file mode 100644
index 0000000..42dd285
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.apache.samza.sql.client.interfaces.SqlSchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Schema converter which converts Avro schema to Samza Sql schema
+ */
+public class AvroSqlSchemaConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(AvroSqlSchemaConverter.class);
+
+ public static SqlSchema convertAvroToSamzaSqlSchema(String schema) {
+ Schema avroSchema = Schema.parse(schema);
+ return getSchema(avroSchema.getFields());
+ }
+
+ private static SqlSchema getSchema(List<Schema.Field> fields) {
+ SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
+ for (Schema.Field field : fields) {
+ schemaBuilder.addField(field.name(), getColumnTypeName(getFieldType(field.schema())));
+ }
+ return schemaBuilder.toSchema();
+ }
+
+ private static String getColumnTypeName(SamzaSqlFieldType fieldType) {
+ if (fieldType.isPrimitiveField()) {
+ return fieldType.getTypeName().toString();
+ } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.MAP) {
+ return String.format("MAP(%s)", getColumnTypeName(fieldType.getValueType()));
+ } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.ARRAY) {
+ return String.format("ARRAY(%s)", getColumnTypeName(fieldType.getElementType()));
+ } else {
+ SqlSchema schema = fieldType.getRowSchema();
+ List<String> fieldTypes = IntStream.range(0, schema.getFieldCount())
+ .mapToObj(i -> schema.getFieldName(i) + " " + schema.getFieldTypeName(i))
+ .collect(Collectors.toList());
+ String rowSchemaValue = Joiner.on(", ").join(fieldTypes);
+ return String.format("STRUCT(%s)", rowSchemaValue);
+ }
+ }
+
+ private static SamzaSqlFieldType getFieldType(org.apache.avro.Schema schema) {
+ switch (schema.getType()) {
+ case ARRAY:
+ return SamzaSqlFieldType.createArrayFieldType(getFieldType(schema.getElementType()));
+ case BOOLEAN:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN);
+ case DOUBLE:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.DOUBLE);
+ case FLOAT:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.FLOAT);
+ case ENUM:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+ case UNION:
+ // NOTE: We only support Union types when they are used for representing Nullable fields in Avro
+ List<org.apache.avro.Schema> types = schema.getTypes();
+ if (types.size() == 2) {
+ if (types.get(0).getType() == org.apache.avro.Schema.Type.NULL) {
+ return getFieldType(types.get(1));
+ } else if ((types.get(1).getType() == org.apache.avro.Schema.Type.NULL)) {
+ return getFieldType(types.get(0));
+ }
+ }
+ case FIXED:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+ case STRING:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+ case BYTES:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BYTES);
+ case INT:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT32);
+ case LONG:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT64);
+ case RECORD:
+ return SamzaSqlFieldType.createRowFieldType(getSchema(schema.getFields()));
+ case MAP:
+ return SamzaSqlFieldType.createMapFieldType(getFieldType(schema.getValueType()));
+ default:
+ String msg = String.format("Field Type %s is not supported", schema.getType());
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
new file mode 100644
index 0000000..49e051a
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * System factory of Samza Sql Shell which needs to provide Consumer, Producer and Admin
+ */
+public class CliLoggingSystemFactory implements SystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CliLoggingSystemFactory.class);
+ private static AtomicInteger messageCounter = new AtomicInteger(0);
+
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ return new CliLoggingSystemFactory.LoggingSystemProducer();
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new CliLoggingSystemFactory.SimpleSystemAdmin(config);
+ }
+
+ private static class SimpleSystemAdmin implements SystemAdmin {
+
+ public SimpleSystemAdmin(Config config) {
+ }
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ return streamNames.stream()
+ .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
+ Collections.singletonMap(new Partition(0),
+ new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ if (offset1 == null) {
+ return offset2 == null ? 0 : -1;
+ } else if (offset2 == null) {
+ return 1;
+ }
+ return offset1.compareTo(offset2);
+ }
+ }
+
+ private class LoggingSystemProducer implements SystemProducer {
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void register(String source) {
+ LOG.info("Registering source" + source);
+ }
+
+ @Override
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ LOG.info(String.format(String.format("Message %d :", messageCounter.incrementAndGet())));
+ String msg = String.format("OutputStream:%s Key:%s Value:%s", envelope.getSystemStream(), envelope.getKey(),
+ new String((byte[]) envelope.getMessage()));
+ LOG.info(msg);
+
+ SamzaExecutor.saveOutputMessage(envelope);
+ }
+
+ @Override
+ public void flush(String source) {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2782818e/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..8d0b12f
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.AvroTypeFactoryImpl;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Relational schemas provider which reads system schema from a given directory
+ */
+public class FileSystemAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+ public static final String CFG_SCHEMA_DIR = "schemaDir";
+
+ @Override
+ public RelSchemaProvider create(SystemStream systemStream, Config config) {
+ return new FileSystemAvroRelSchemaProvider(systemStream, config);
+ }
+
+ private class FileSystemAvroRelSchemaProvider implements AvroRelSchemaProvider {
+ private final SystemStream systemStream;
+ private final String schemaDir;
+
+ public FileSystemAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+ this.systemStream = systemStream;
+ this.schemaDir = config.get(CFG_SCHEMA_DIR);
+ }
+
+ @Override
+ public RelDataType getRelationalSchema() {
+ String schemaStr = this.getSchema(this.systemStream);
+ Schema schema = Schema.parse(schemaStr);
+ AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+ return avroTypeFactory.createType(schema);
+ }
+
+ @Override
+ public String getSchema(SystemStream systemStream) {
+ String fileName = String.format("%s.avsc", systemStream.getStream());
+ File file = new File(schemaDir, fileName);
+ try {
+ return Schema.parse(file).toString();
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ }
+ }
+}