You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/22 16:51:51 UTC

[1/2] samza git commit: SAMZA-1901: Implementation of Samza SQL Shell

Repository: samza
Updated Branches:
  refs/heads/master fd6f1ed25 -> e203db2aa


http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
new file mode 100755
index 0000000..7c2ca32
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.*;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.RandomAccessQueue;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.fn.RegexMatchUdf;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.kafka.KafkaSystemFactory;
+import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
+import org.apache.samza.tools.avro.AvroSerDeFactory;
+import org.apache.samza.tools.json.JsonRelConverterFactory;
+import org.apache.samza.tools.schemas.ProfileChangeEvent;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+
+/**
+ * Samza implementation of Executor for Samza SQL Shell.
+ */
+public class SamzaExecutor implements SqlExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutor.class);
+
+  private static final String SAMZA_SYSTEM_LOG = "log";
+  private static final String SAMZA_SYSTEM_KAFKA = "kafka";
+  private static final String SAMZA_SQL_OUTPUT = "samza.sql.output";
+  private static final String SAMZA_SQL_SYSTEM_KAFKA_ADDRESS = "samza.sql.system.kafka.address";
+  private static final String DEFAULT_SERVER_ADDRESS = "localhost:2181";
+
+  // The maximum number of rows of data we keep when user pauses the display view and data accumulates.
+  private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
+  private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
+
+  private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
+          new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
+  private static AtomicInteger execIdSeq = new AtomicInteger(0);
+  private Map<Integer, SamzaSqlApplicationRunner> executions = new HashMap<>();
+  private String lastErrorMsg = "";
+
+  // -- implementation of SqlExecutor ------------------------------------------
+
+  @Override
+  public void start(ExecutionContext context) {
+  }
+
+  @Override
+  public void stop(ExecutionContext context) {
+    Iterator<Integer> iter = executions.keySet().iterator();
+    while (iter.hasNext()) {
+      stopExecution(context, iter.next());
+      iter.remove();
+    }
+    outputData.clear();
+  }
+
+  @Override
+  public List<String> listTables(ExecutionContext context) {
+    /**
+     * TODO: currently Shell can only talk to Kafka system, but we should use a general way
+     *       to connect to different systems.
+     */
+    lastErrorMsg = "";
+    String address = context.getConfigMap().getOrDefault(SAMZA_SQL_SYSTEM_KAFKA_ADDRESS, DEFAULT_SERVER_ADDRESS);
+    List<String> tables = null;
+    try {
+      ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
+          new ZkConnection(address), false);
+      tables = JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
+        .stream()
+        .map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
+        .collect(Collectors.toList());
+    } catch (ZkTimeoutException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+    }
+    return tables;
+  }
+
+  @Override
+  public SqlSchema getTableSchema(ExecutionContext context, String tableName) {
+    /**
+     *  currently Shell works only for systems that has Avro schemas
+     */
+    lastErrorMsg = "";
+    int execId = execIdSeq.incrementAndGet();
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+    Config samzaSqlConfig = new MapConfig(staticConfigs);
+    SqlSchema sqlSchema = null;
+    try {
+      SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(samzaSqlConfig);
+      SqlIOConfig sourceInfo = ioResolver.fetchSourceInfo(tableName);
+      RelSchemaProvider schemaProvider =
+              SamzaSqlApplicationConfig.initializePlugin("RelSchemaProvider", sourceInfo.getRelSchemaProviderName(),
+                      samzaSqlConfig, SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
+                      (o, c) -> ((RelSchemaProviderFactory) o).create(sourceInfo.getSystemStream(), c));
+      AvroRelSchemaProvider avroSchemaProvider = (AvroRelSchemaProvider) schemaProvider;
+      String schema = avroSchemaProvider.getSchema(sourceInfo.getSystemStream());
+      sqlSchema = AvroSqlSchemaConverter.convertAvroToSamzaSqlSchema(schema);
+    } catch (SamzaException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+    }
+    return sqlSchema;
+  }
+
+  @Override
+  public QueryResult executeQuery(ExecutionContext context, String statement) {
+    lastErrorMsg = "";
+    outputData.clear();
+
+    int execId = execIdSeq.incrementAndGet();
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+    List<String> sqlStmts = formatSqlStmts(Collections.singletonList(statement));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+
+    SamzaSqlApplicationRunner runner;
+    try {
+      runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+      runner.run();
+    } catch (SamzaException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+      return new QueryResult(execId, null, false);
+    }
+    executions.put(execId, runner);
+    LOG.debug("Executing sql. Id ", execId);
+
+    return new QueryResult(execId, generateResultSchema(new MapConfig(staticConfigs)), true);
+  }
+
+  @Override
+  public int getRowCount() {
+    return outputData.getSize();
+  }
+
+  @Override
+  public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow) {
+    List<String[]> results = new ArrayList<>();
+    for (OutgoingMessageEnvelope row : outputData.get(startRow, endRow)) {
+      results.add(getFormattedRow(context, row));
+    }
+    return results;
+  }
+
+  @Override
+  public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow) {
+    List<String[]> results = new ArrayList<>();
+    for (OutgoingMessageEnvelope row : outputData.consume(startRow, endRow)) {
+      results.add(getFormattedRow(context, row));
+    }
+    return results;
+  }
+
+  @Override
+  public NonQueryResult executeNonQuery(ExecutionContext context, File sqlFile) {
+    lastErrorMsg = "";
+
+    LOG.info("Sql file path: " + sqlFile.getPath());
+    List<String> executedStmts = new ArrayList<>();
+    try {
+      executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList());
+    } catch (IOException e) {
+      lastErrorMsg = String.format("Unable to parse the sql file %s. %s", sqlFile.getPath(), e.toString());
+      LOG.error(lastErrorMsg);
+      return new NonQueryResult(-1, false);
+    }
+    LOG.info("Sql statements in Sql file: " + executedStmts.toString());
+
+    List<String> submittedStmts = new ArrayList<>();
+    List<String> nonSubmittedStmts = new ArrayList<>();
+    validateExecutedStmts(executedStmts, submittedStmts, nonSubmittedStmts);
+    if (submittedStmts.isEmpty()) {
+      lastErrorMsg = "Nothing to execute. Note: SELECT statements are ignored.";
+      LOG.warn("Nothing to execute. Statements in the Sql file: {}", nonSubmittedStmts);
+      return new NonQueryResult(-1, false);
+    }
+    NonQueryResult result = executeNonQuery(context, submittedStmts);
+    return new NonQueryResult(result.getExecutionId(), result.succeeded(), submittedStmts, nonSubmittedStmts);
+  }
+
+  @Override
+  public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statement) {
+    lastErrorMsg = "";
+
+    int execId = execIdSeq.incrementAndGet();
+    Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(formatSqlStmts(statement)));
+
+    SamzaSqlApplicationRunner runner;
+    try {
+      runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+      runner.run();
+    } catch (SamzaException ex) {
+      lastErrorMsg = ex.toString();
+      LOG.error(lastErrorMsg);
+      return new NonQueryResult(execId, false);
+    }
+    executions.put(execId, runner);
+    LOG.debug("Executing sql. Id ", execId);
+
+    return new NonQueryResult(execId, true);
+  }
+
+  @Override
+  public boolean stopExecution(ExecutionContext context, int exeId) {
+    lastErrorMsg = "";
+
+    SamzaSqlApplicationRunner runner = executions.get(exeId);
+    if (runner != null) {
+      LOG.debug("Stopping execution ", exeId);
+
+      try {
+        runner.kill();
+      } catch (SamzaException ex) {
+        lastErrorMsg = ex.toString();
+        LOG.debug(lastErrorMsg);
+        return false;
+      }
+
+      try {
+        Thread.sleep(500); // wait for a second
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      return true;
+    } else {
+      lastErrorMsg = "Trying to stop a non-existing SQL execution " + exeId;
+      LOG.warn(lastErrorMsg);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean removeExecution(ExecutionContext context, int exeId) {
+    lastErrorMsg = "";
+
+    SamzaSqlApplicationRunner runner = executions.get(exeId);
+    if (runner != null) {
+      if (runner.status().getStatusCode().equals(ApplicationStatus.StatusCode.Running)) {
+        lastErrorMsg = "Trying to remove a ongoing execution " + exeId;
+        LOG.error(lastErrorMsg);
+        return false;
+      }
+      executions.remove(exeId);
+      LOG.debug("Stopping execution ", exeId);
+      return true;
+    } else {
+      lastErrorMsg = "Trying to remove a non-existing SQL execution " + exeId;
+      LOG.warn(lastErrorMsg);
+      return false;
+    }
+  }
+
+  @Override
+  public ExecutionStatus queryExecutionStatus(int execId) {
+    SamzaSqlApplicationRunner runner = executions.get(execId);
+    if (runner == null) {
+      return null;
+    }
+    return queryExecutionStatus(runner);
+  }
+
+  @Override
+  public String getErrorMsg() {
+    return lastErrorMsg;
+  }
+
+  @Override
+  public List<SqlFunction> listFunctions(ExecutionContext context) {
+    /**
+     * TODO: currently the Shell only shows some UDFs supported by Samza internally. We may need to require UDFs
+     *       to provide a function of getting their "SamzaSqlUdfDisplayInfo", then we can get the UDF information from
+     *       SamzaSqlApplicationConfig.udfResolver(or SamzaSqlApplicationConfig.udfMetadata) instead of registering
+     *       UDFs one by one as below.
+     */
+    List<SqlFunction> udfs = new ArrayList<>();
+    udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex",
+            Arrays.asList(SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING),
+                    SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING)),
+            SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN)));
+
+    return udfs;
+  }
+
+  static void saveOutputMessage(OutgoingMessageEnvelope messageEnvelope) {
+    outputData.add(messageEnvelope);
+  }
+
+  static Map<String, String> fetchSamzaSqlConfig(int execId, ExecutionContext executionContext) {
+    HashMap<String, String> staticConfigs = new HashMap<>();
+
+    staticConfigs.put(JobConfig.JOB_NAME(), "sql-job-" + execId);
+    staticConfigs.put(JobConfig.PROCESSOR_ID(), String.valueOf(execId));
+    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+    String configIOResolverDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedIOResolverFactory.class.getName());
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+    String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+    staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        ConfigBasedUdfResolver.class.getName());
+    staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES,
+        Joiner.on(",").join(RegexMatchUdf.class.getName(), FlattenUdf.class.getName()));
+
+    staticConfigs.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
+    staticConfigs.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName());
+    staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, ProfileChangeEvent.SCHEMA$.toString());
+
+    String kafkaSystemConfigPrefix =
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_KAFKA);
+    String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_KAFKA);
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", KafkaSystemFactory.class.getName());
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string");
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro");
+    staticConfigs.put(kafkaSystemConfigPrefix + "consumer.zookeeper.connect", "localhost:2181");
+    staticConfigs.put(kafkaSystemConfigPrefix + "producer.bootstrap.servers", "localhost:9092");
+
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
+    staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest");
+
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String logSystemConfigPrefix =
+        String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG);
+    String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG);
+    staticConfigs.put(logSystemConfigPrefix + "samza.factory", CliLoggingSystemFactory.class.getName());
+    staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json");
+    staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String avroSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+
+    staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        AvroSchemaGenRelConverterFactory.class.getName());
+
+    String jsonSamzaToRelMsgConverterDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "json");
+
+    staticConfigs.put(jsonSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        JsonRelConverterFactory.class.getName());
+
+    String configAvroRelSchemaProviderDomain =
+        String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
+    staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+        FileSystemAvroRelSchemaProviderFactory.class.getName());
+
+    staticConfigs.put(
+        configAvroRelSchemaProviderDomain + FileSystemAvroRelSchemaProviderFactory.CFG_SCHEMA_DIR,
+        "/tmp/schemas/");
+
+    /* TODO: we need to validate and read configurations from shell-defaults.conf (aka. "executionContext"),
+     *       and update their value if they've been included in staticConfigs. We could handle these logic
+     *       Shell level, or in Executor level.
+     */
+    staticConfigs.putAll(executionContext.getConfigMap());
+
+    return staticConfigs;
+  }
+
+  private List<String> formatSqlStmts(List<String> statements) {
+    return statements.stream().map(sql -> {
+      if (!sql.toLowerCase().startsWith("insert")) {
+        String formattedSql = String.format("insert into log.outputStream %s", sql);
+        LOG.debug("Sql formatted. ", sql, formattedSql);
+        return formattedSql;
+      } else {
+        return sql;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private void validateExecutedStmts(List<String> statements, List<String> submittedStmts,
+                                     List<String> nonSubmittedStmts) {
+    for (String sql : statements) {
+      if (sql.isEmpty()) {
+        continue;
+      }
+      if (!sql.toLowerCase().startsWith("insert")) {
+        nonSubmittedStmts.add(sql);
+      } else {
+        submittedStmts.add(sql);
+      }
+    }
+  }
+
+  SqlSchema generateResultSchema(Config config) {
+    SamzaSqlDslConverter converter = (SamzaSqlDslConverter) new SamzaSqlDslConverterFactory().create(config);
+    RelRoot relRoot = converter.convertDsl("").iterator().next();
+
+    List<String> colNames = new ArrayList<>();
+    List<String> colTypeNames = new ArrayList<>();
+    for (RelDataTypeField dataTypeField : relRoot.validatedRowType.getFieldList()) {
+      colNames.add(dataTypeField.getName());
+      colTypeNames.add(dataTypeField.getType().toString());
+    }
+
+    return new SqlSchema(colNames, colTypeNames);
+  }
+
+  private String[] getFormattedRow(ExecutionContext context, OutgoingMessageEnvelope row) {
+    String[] formattedRow = new String[1];
+    String outputFormat = context.getConfigMap().get(SAMZA_SQL_OUTPUT);
+    if (outputFormat == null || !outputFormat.equalsIgnoreCase(MessageFormat.PRETTY.toString())) {
+      formattedRow[0] = getCompressedFormat(row);
+    } else {
+      formattedRow[0] = getPrettyFormat(row);
+    }
+    return formattedRow;
+  }
+
+  private ExecutionStatus queryExecutionStatus(SamzaSqlApplicationRunner runner) {
+    lastErrorMsg = "";
+    switch (runner.status().getStatusCode()) {
+      case New:
+        return ExecutionStatus.New;
+      case Running:
+        return ExecutionStatus.Running;
+      case SuccessfulFinish:
+        return ExecutionStatus.SuccessfulFinish;
+      case UnsuccessfulFinish:
+        return ExecutionStatus.UnsuccessfulFinish;
+      default:
+        lastErrorMsg = String.format("Unsupported execution status %s",
+                runner.status().getStatusCode().toString());
+        return null;
+    }
+  }
+
+  private String getPrettyFormat(OutgoingMessageEnvelope envelope) {
+    String value = new String((byte[]) envelope.getMessage());
+    ObjectMapper mapper = new ObjectMapper();
+    String formattedValue;
+    try {
+      Object json = mapper.readValue(value, Object.class);
+      formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+    } catch (IOException e) {
+      formattedValue = value;
+      LOG.error("Error while formatting json", e);
+    }
+    return formattedValue;
+  }
+
+  private String getCompressedFormat(OutgoingMessageEnvelope envelope) {
+    return new String((byte[]) envelope.getMessage());
+  }
+
+  private enum MessageFormat {
+    PRETTY,
+    COMPACT
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
new file mode 100644
index 0000000..ed11a53
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+
+/**
+ * Types of Samza Sql fields.
+ */
+public class SamzaSqlFieldType {
+
+  private TypeName typeName;
+  private SamzaSqlFieldType elementType;
+  private SamzaSqlFieldType valueType;
+  private SqlSchema rowSchema;
+
+  private SamzaSqlFieldType(TypeName typeName, SamzaSqlFieldType elementType, SamzaSqlFieldType valueType, SqlSchema rowSchema) {
+    this.typeName = typeName;
+    this.elementType = elementType;
+    this.valueType = valueType;
+    this.rowSchema = rowSchema;
+  }
+
+  public static SamzaSqlFieldType createPrimitiveFieldType(TypeName typeName) {
+    return new SamzaSqlFieldType(typeName, null, null, null);
+  }
+
+  public static SamzaSqlFieldType createArrayFieldType(SamzaSqlFieldType elementType) {
+    return new SamzaSqlFieldType(TypeName.ARRAY, elementType, null, null);
+  }
+
+  public static SamzaSqlFieldType createMapFieldType(SamzaSqlFieldType valueType) {
+    return new SamzaSqlFieldType(TypeName.MAP, null, valueType, null);
+  }
+
+  public static SamzaSqlFieldType createRowFieldType(SqlSchema rowSchema) {
+    return new SamzaSqlFieldType(TypeName.ROW, null, null, rowSchema);
+  }
+
+  public boolean isPrimitiveField() {
+    return typeName != TypeName.ARRAY && typeName != TypeName.MAP && typeName != TypeName.ROW;
+  }
+
+  public TypeName getTypeName() {
+    return typeName;
+  }
+
+  public SamzaSqlFieldType getElementType() {
+    return elementType;
+  }
+
+  public SamzaSqlFieldType getValueType() {
+    return valueType;
+  }
+
+  public SqlSchema getRowSchema() {
+    return rowSchema;
+  }
+
+  public enum TypeName {
+    BYTE, // One-byte signed integer.
+    INT16, // two-byte signed integer.
+    INT32, // four-byte signed integer.
+    INT64, // eight-byte signed integer.
+    DECIMAL, // Decimal integer
+    FLOAT,
+    DOUBLE,
+    STRING, // String.
+    DATETIME, // Date and time.
+    BOOLEAN, // Boolean.
+    BYTES, // Byte array.
+    ARRAY,
+    MAP,
+    ROW, // The field is itself a nested row.
+    ANY
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
new file mode 100644
index 0000000..2e89978
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import org.apache.samza.sql.client.interfaces.SqlFunction;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * UDF information displayer
+ */
+public class SamzaSqlUdfDisplayInfo implements SqlFunction {
+
+  private String name;
+
+  private String description;
+
+  private List<SamzaSqlFieldType> argumentTypes;
+
+  private SamzaSqlFieldType returnType;
+
+  public SamzaSqlUdfDisplayInfo(String name, String description, List<SamzaSqlFieldType> argumentTypes,
+                                SamzaSqlFieldType returnType) {
+    this.name = name;
+    this.description = description;
+    this.argumentTypes = argumentTypes;
+    this.returnType = returnType;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public List<String> getArgumentTypes() {
+    return argumentTypes.stream().map(x -> x.getTypeName().toString()).collect(Collectors.toList());
+  }
+
+  public String getReturnType() {
+    return returnType.getTypeName().toString();
+  }
+
+  public String toString() {
+    List<String> argumentTypeNames =
+            argumentTypes.stream().map(x -> x.getTypeName().toString()).collect(Collectors.toList());
+    String args = Joiner.on(", ").join(argumentTypeNames);
+    return String.format("%s(%s) returns <%s> : %s", name, args, returnType.getTypeName().toString(), description);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
new file mode 100644
index 0000000..14dfa64
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.Map;
+
+/**
+ * Whenever the shell calls the executor to execute a SQL statement, an object of ExecutionContext is passed.
+ */
+public class ExecutionContext {
+  private Map<String, String> m_configs;
+
+  public ExecutionContext(Map<String, String> config) {
+    m_configs = config;
+  }
+
+  /**
+  * @return The Map storing all configuration pairs. Note that the set map is the same as the one used by
+  * ExecutionContext, so changes to the map are reflected in ExecutionContext, and vice-versa.
+  */
+  public Map<String, String> getConfigMap() {
+    return m_configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
new file mode 100644
index 0000000..5991922
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * An executor shall throw an ExecutionException when it encounters an unrecoverable error.
+ */
+public class ExecutionException extends RuntimeException {
+  public ExecutionException() {
+  }
+
+  public ExecutionException(String message) {
+    super(message);
+  }
+
+  public ExecutionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ExecutionException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
new file mode 100644
index 0000000..77ee888
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * Status of the execution of a SQL statement.
+ */
+public enum ExecutionStatus {
+  New,
+  Running,
+  SuccessfulFinish,
+  UnsuccessfulFinish
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
new file mode 100644
index 0000000..0173ad1
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.List;
+
+/**
+ * Result of a non-query SQL statement or SQL file containing multiple non-query statements.
+ */
+public class NonQueryResult {
+  private int execId; // execution ID of the statement(s) submitted
+  private boolean success; // whether the statement(s) submitted successfully
+
+  // When user submits a batch of SQL statements, only the non-query ones will be submitted
+  private List<String> submittedStmts;
+  private List<String> nonSubmittedStmts;
+
+  public NonQueryResult(int execId, boolean success) {
+    this.execId = execId;
+    this.success = success;
+  }
+
+  public NonQueryResult(int execId, boolean success, List<String> submittedStmts, List<String> nonSubmittedStmts) {
+    this.execId = execId;
+    this.success = success;
+    this.submittedStmts = submittedStmts;
+    this.nonSubmittedStmts = nonSubmittedStmts;
+  }
+
+  public int getExecutionId() {
+    return execId;
+  }
+
+  public boolean succeeded() {
+    return success;
+  }
+
+  public List<String> getSubmittedStmts() {
+    return submittedStmts;
+  }
+
+  public List<String> getNonSubmittedStmts() {
+    return nonSubmittedStmts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
new file mode 100644
index 0000000..6f54557
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * Execution result of a SELECT statement. It doesn't contain data though.
+ */
+public class QueryResult {
+  private int execId; // execution ID of the statement(s) submitted
+  private boolean success; // whether the statement(s) submitted successfully
+  private SqlSchema schema; // The schema of the data coming from the query
+
+  public QueryResult(int execId, SqlSchema schema, Boolean success) {
+    if (success && schema == null)
+      throw new IllegalArgumentException();
+    this.execId = execId;
+    this.schema = schema;
+    this.success = success;
+  }
+
+  public int getExecutionId() {
+    return execId;
+  }
+
+  public SqlSchema getSchema() {
+    return schema;
+  }
+
+  public boolean succeeded() {
+    return success;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
new file mode 100644
index 0000000..9d528f6
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+
+import java.io.File;
+import java.util.List;
+
+
+/**
+ * Conventions:
+ * <p>
+ * Implementations shall report UNRECOVERABLE EXCEPTIONS by throwing
+ * ExecutionExceptions, though SqlExecutor doesn't enforce this by as we don't believe in
+ * Java checked exceptions. Report errors by returning values as indicated by each
+ * function and preparing for the subsequent getErrorMsg call.
+ * <p>
+ * Each execution (both query and non-query shall return an non-negative execution ID(execId).
+ * Negative execution IDs are reserved for error handling.
+ * <p>
+ * User shall be able to query the status of an execution even after it's finished, so the
+ * executor shall keep record of all the execution unless being asked to remove them (
+ * when removeExecution is called.)
+ * <p>
+ * IMPORTANT: An executor shall support two ways of supplying data:
+ * 1. Say user selects profiles of users who visited LinkedIn in the last 5 mins. There could
+ * be millions of rows, but the UI only need to show a small fraction. That's retrieveQueryResult,
+ * accepting a row range (startRow and endRow). Note that UI may ask for the same data over and over,
+ * like when user switches from page 1 to page 2 and data stream changes at the same time, the two
+ * pages may actually have overlapped or even same data.
+ * <p>
+ * 2. Say user wants to see clicks on a LinkedIn page of certain person from now on. In this mode
+ * consumeQueryResult shall be used. UI can keep asking for new rows, and once the rows are consumed,
+ * it's no longer necessary for the executor to keep them. If lots of rows come in, the UI may be only
+ * interested in the last certain rows (as it's in a logview mode), so all data older can be dropped.
+ */
+public interface SqlExecutor {
+  /**
+   * SqlExecutor shall be ready to accept all other calls after start() is called.
+   * However, it shall NOT store the ExecutionContext for future use, as each
+   * call will be given an ExecutionContext which may differ from this one.
+   *
+   * @param context The ExecutionContext at the time of the call.
+   */
+  public void start(ExecutionContext context);
+
+  /**
+   * Indicates no further calls will be made thus it's safe for the executor to clean up.
+   *
+   * @param context The ExecutionContext at the time of the call.
+   */
+  public void stop(ExecutionContext context);
+
+  /**
+   * @param context The ExecutionContext at the time of the call.
+   * @return null if an error occurs. Prepare for subsequent getErrorMsg call.
+   * an empty list indicates no tables found.
+   */
+  public List<String> listTables(ExecutionContext context);
+
+  /**
+   * @param context   The ExecutionContext at the time of the call.
+   * @param tableName Name of the table to get the schema for.
+   * @return null if an error occurs. Prepare for subsequent getErrorMsg call.
+   */
+  public SqlSchema getTableSchema(ExecutionContext context, String tableName);
+
+  /**
+   * @param context   The ExecutionContext at the time of the call.
+   * @param statement statement to execute
+   * @return The query result.
+   */
+  public QueryResult executeQuery(ExecutionContext context, String statement);
+
+
+  /**
+   * @return how many rows available for reading.
+   */
+  public int getRowCount();
+
+  /**
+   * Row starts at 0. Executor shall keep the data retrieved.
+   * For now we get strings for display but we might want strong typed values.
+   *
+   * @param context  The ExecutionContext at the time of the call.
+   * @param startRow Start row index (inclusive)
+   * @param endRow   End row index (inclusive)
+   * @return A list of row data represented by a String array.
+   */
+  public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow);
+
+
+  /**
+   * Consumes rows from query result. Executor shall drop them, as "consume" indicates.
+   * ALL data before endRow (inclusive, including data before startRow) shall be deleted.
+   *
+   * @param context  The ExecutionContext at the time of the call.
+   * @param startRow Start row index (inclusive)
+   * @param endRow   End row index (inclusive)
+   * @return available data between startRow and endRow (both are inclusive)
+   */
+  public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow);
+
+  /**
+   * Executes all the NON-QUERY statements in the sqlFile.
+   * Query statements are ignored as it won't make sense.
+   *
+   * @param context The ExecutionContext at the time of the call.
+   * @param file    A File object to read statements from.
+   * @return Execution result.
+   */
+  public NonQueryResult executeNonQuery(ExecutionContext context, File file);
+
+  /**
+   * @param context    The ExecutionContext at the time of the call.
+   * @param statements A list of non-query sql statements.
+   * @return Execution result.
+   */
+  public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statements);
+
+  /**
+   * @param context The ExecutionContext at the time of the call.
+   * @param exeId   Execution ID.
+   * @return Whether the operation suceeded or not.
+   */
+  public boolean stopExecution(ExecutionContext context, int exeId);
+
+
+  /**
+   * Removing an ongoing execution shall result in an error. Stop it first.
+   *
+   * @param context The ExecutionContext at the time of the call
+   * @param exeId   Execution ID.
+   * @return Whether the operation succeeded or not.
+   */
+  public boolean removeExecution(ExecutionContext context, int exeId);
+
+  /**
+   * @param execId Execution ID.
+   * @return ExecutionStatus.
+   */
+  public ExecutionStatus queryExecutionStatus(int execId);
+
+  /**
+   * @return The last error message of last function call.
+   */
+  public String getErrorMsg();
+
+  /**
+   * @param context The ExecutionContext at the time of the call.
+   * @return A list of SqlFunction.
+   */
+  List<SqlFunction> listFunctions(ExecutionContext context);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
new file mode 100644
index 0000000..3ac602c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.List;
+
+/**
+ * Represents a SQL function.
+ */
+public interface SqlFunction {
+  /**
+   * Gets the name of the function.
+   * @return name of the function
+   */
+  public String getName();
+
+  /**
+   * Gets the description of the function.
+   * @return description of the function.
+   */
+  public String getDescription();
+
+  /**
+   * Gets the argument types of the function as a List.
+   * @return A list containing the type names of the arguments.
+   */
+  public List<String> getArgumentTypes();
+
+  /**
+   * Gets the return type of the function.
+   * @return return type name
+   */
+  public String getReturnType();
+
+  /**
+   * Don't forget to implement toString()
+   */
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
new file mode 100644
index 0000000..7dcabdb
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+
+import java.util.List;
+
+/**
+ * A primitive representation of SQL schema which is just for display purpose.
+ */
+public class SqlSchema {
+  private String[] names; // field names
+  private String[] typeNames; // names of field type
+
+  public SqlSchema(List<String> colNames, List<String> colTypeNames) {
+    if (colNames == null || colNames.size() == 0
+            || colTypeNames == null || colTypeNames.size() == 0
+            || colNames.size() != colTypeNames.size())
+      throw new IllegalArgumentException();
+
+    names = new String[colNames.size()];
+    names = colNames.toArray(names);
+
+    typeNames = new String[colTypeNames.size()];
+    typeNames = colTypeNames.toArray(typeNames);
+  }
+
+  public int getFieldCount() {
+    return names.length;
+  }
+
+  public String getFieldName(int colIdx) {
+    return names[colIdx];
+  }
+
+  public String getFieldTypeName(int colIdx) {
+    return typeNames[colIdx];
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
new file mode 100644
index 0000000..84e8a0e
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Convenient class for building a SqlSchema object.
+ */
+public class SqlSchemaBuilder {
+  private List<String> names = new ArrayList<>();
+  private List<String> typeNames = new ArrayList<>();
+
+  private SqlSchemaBuilder() {
+  }
+
+  public static SqlSchemaBuilder builder() {
+    return new SqlSchemaBuilder();
+  }
+
+  public SqlSchemaBuilder addField(String name, String fieldType) {
+    if (name == null || name.isEmpty() || fieldType == null)
+      throw new IllegalArgumentException();
+
+    names.add(name);
+    typeNames.add(fieldType);
+    return this;
+  }
+
+  public SqlSchemaBuilder appendFields(List<String> names, List<String> typeNames) {
+    if (names == null || names.size() == 0
+            || typeNames == null || typeNames.size() == 0
+            || names.size() != typeNames.size())
+      throw new IllegalArgumentException();
+
+    this.names.addAll(names);
+    this.typeNames.addAll(typeNames);
+
+    return this;
+  }
+
+  public SqlSchema toSchema() {
+    return new SqlSchema(names, typeNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
new file mode 100755
index 0000000..743ff44
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+/**
+ * The exception used by the shell for unrecoverable errors.
+ */
+public class CliException extends RuntimeException {
+  public CliException() {
+
+  }
+
+  public CliException(String message) {
+    super(message);
+  }
+
+  public CliException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public CliException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
new file mode 100755
index 0000000..03fe1b1
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+/**
+ * Convenient utility class with static methods.
+ */
+public class CliUtil {
+  public static boolean isNullOrEmpty(String str) {
+    return str == null || str.isEmpty();
+  }
+
+  public static int ceilingDiv(int x, int y) {
+    if (x < 0 || y <= 0)
+      throw new IllegalArgumentException();
+
+    return x / y + (x % y == 0 ? 0 : 1);
+  }
+
+  public static StringBuilder appendTo(StringBuilder builder, int toPos, char c) {
+    for (int i = builder.length(); i <= toPos; ++i) {
+      builder.append(c);
+    }
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
new file mode 100644
index 0000000..789d64c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A queue that supports random access and consumption.
+ * @param <T> Element type
+ */
+public class RandomAccessQueue<T> {
+  private T[] buffer;
+  private int capacity;
+  private int size;
+  private int head;
+
+  public RandomAccessQueue(Class<T> t, int capacity) {
+    this.capacity = capacity;
+    head = 0;
+    size = 0;
+
+    @SuppressWarnings("unchecked") final T[] b = (T[]) Array.newInstance(t, capacity);
+    buffer = b;
+  }
+
+  public synchronized List<T> get(int start, int end) {
+    int lowerBound = Math.max(start, 0);
+    int upperBound = Math.min(end, size - 1);
+    List<T> rets = new ArrayList<>();
+    for (int i = lowerBound; i <= upperBound; i++) {
+      rets.add(buffer[(head + i) % capacity]);
+    }
+    return rets;
+  }
+
+  public synchronized T get(int index) {
+    if (index >= 0 && index < size) {
+      return buffer[(head + index) % capacity];
+    }
+    throw new CliException("OutOfBoundaryError");
+  }
+
+  public synchronized void add(T t) {
+    if (size >= capacity) {
+      buffer[head] = t;
+      head = (head + 1) % capacity;
+    } else {
+      int pos = (head + size) % capacity;
+      buffer[pos] = t;
+      size++;
+    }
+  }
+
+  /*
+   * Remove all element before 'end', and return elements between 'start' and 'end'
+   */
+  public synchronized List<T> consume(int start, int end) {
+    List<T> rets = get(start, end);
+    int upperBound = Math.min(end, size - 1);
+    head = (end + 1) % capacity;
+    size -= (upperBound + 1);
+    return rets;
+  }
+
+  public synchronized int getHead() {
+    return head;
+  }
+
+  public synchronized int getSize() {
+    return size;
+  }
+
+  public synchronized void clear() {
+    head = 0;
+    size = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
new file mode 100644
index 0000000..18fe4b7
--- /dev/null
+++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.sql.client.impl.SamzaExecutor.*;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
+
+public class SamzaExecutorTest {
+    private SamzaExecutor m_executor = new SamzaExecutor();
+
+    @Test
+    public void testGetTableSchema() {
+        ExecutionContext context = getExecutionContext();
+        SqlSchema ts = m_executor.getTableSchema(context, "kafka.ProfileChangeStream");
+
+        Assert.assertEquals("Name", ts.getFieldName(0));
+        Assert.assertEquals("NewCompany", ts.getFieldName(1));
+        Assert.assertEquals("OldCompany", ts.getFieldName(2));
+        Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(3));
+        Assert.assertEquals("STRING", ts.getFieldTypeName(0));
+        Assert.assertEquals("STRING", ts.getFieldTypeName(1));
+        Assert.assertEquals("STRING", ts.getFieldTypeName(2));
+        Assert.assertEquals("INT64", ts.getFieldTypeName(3));
+    }
+
+    @Test
+    public void testGenerateResultSchema() {
+        ExecutionContext context = getExecutionContext();
+        Map<String, String> mapConf = fetchSamzaSqlConfig(1, context);
+        SqlSchema ts = m_executor.generateResultSchema(new MapConfig(mapConf));
+
+        Assert.assertEquals("__key__", ts.getFieldName(0));
+        Assert.assertEquals("Name", ts.getFieldName(1));
+        Assert.assertEquals("NewCompany", ts.getFieldName(2));
+        Assert.assertEquals("OldCompany", ts.getFieldName(3));
+        Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2));
+        Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3));
+        Assert.assertEquals("BIGINT", ts.getFieldTypeName(4));
+    }
+
+    private ExecutionContext getExecutionContext() {
+        ClassLoader classLoader = getClass().getClassLoader();
+        File file = new File(classLoader.getResource("ProfileChangeStream.avsc").getFile());
+        Map<String, String> mapConf = new HashMap<>();
+        mapConf.put("samza.sql.relSchemaProvider.config.schemaDir", file.getParent());
+        mapConf.put(CFG_SQL_STMT, "insert into log.outputStream select * from kafka.ProfileChangeStream");
+        return new ExecutionContext(mapConf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
new file mode 100644
index 0000000..fe7a19a
--- /dev/null
+++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class RandomAccessQueueTest {
+  private RandomAccessQueue m_queue;
+  public RandomAccessQueueTest() {
+    m_queue = new RandomAccessQueue<>(Integer.class, 5);
+  }
+
+  @Test
+  public void testAddAndGetElement() {
+    m_queue.clear();
+    for (int i = 0; i < 4; i++) {
+      m_queue.add(i);
+    }
+    Assert.assertEquals(0, m_queue.getHead());
+    Assert.assertEquals(4, m_queue.getSize());
+    Assert.assertEquals(0, m_queue.get(0));
+    Assert.assertEquals(3, m_queue.get(3));
+
+    for (int i = 0; i < 3; i++) {
+      m_queue.add(4 + i);
+    }
+    int head = m_queue.getHead();
+    Assert.assertEquals(2, head);
+    Assert.assertEquals(5, m_queue.getSize());
+    Assert.assertEquals(2, m_queue.get(0));
+    Assert.assertEquals(3, m_queue.get(1));
+    Assert.assertEquals(4, m_queue.get(2));
+    Assert.assertEquals(5, m_queue.get(3));
+    Assert.assertEquals(6, m_queue.get(4));
+  }
+
+  @Test
+  public void testGetRange() {
+    m_queue.clear();
+    for (int i = 0; i < 4; i++) {
+      m_queue.add(i); // 0, 1, 2, 3
+    }
+    List<Integer> rets = m_queue.get(-1, 9);
+    Assert.assertEquals(4, rets.size());
+    Assert.assertEquals(0, m_queue.get(0));
+    Assert.assertEquals(3, m_queue.get(3));
+
+    for (int i = 0; i <= 2; i++) {
+      m_queue.add(4 + i);
+    }
+    rets = m_queue.get(0, 4);
+    Assert.assertEquals(2, rets.get(0).intValue());
+    Assert.assertEquals(3, rets.get(1).intValue());
+    Assert.assertEquals(4, rets.get(2).intValue());
+    Assert.assertEquals(5, rets.get(3).intValue());
+    Assert.assertEquals(6, rets.get(4).intValue());
+  }
+
+  @Test
+  public void testConsume() {
+    m_queue.clear();
+    for (int i = 0; i < 4; i++) {
+      m_queue.add(i); // 0, 1, 2, 3
+    }
+    List<Integer> rets = m_queue.consume(1, 2);
+    Assert.assertEquals(1, m_queue.getSize());
+    Assert.assertEquals(3, m_queue.getHead());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
new file mode 100644
index 0000000..5c1e49d
--- /dev/null
+++ b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+    "name": "ProfileChangeEvent",
+    "version" : 1,
+    "namespace": "com.linkedin.samza.tools.avro",
+    "type": "record",
+    "fields": [
+        {
+            "name": "Name",
+            "doc": "Name of the profile.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "NewCompany",
+            "doc": "Name of the new company the person joined.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "OldCompany",
+            "doc": "Name of the old company the person was working.",
+            "type": ["null", "string"],
+            "default":null
+        },
+        {
+            "name": "ProfileChangeTimestamp",
+            "doc": "Time at which the profile was changed.",
+            "type": ["null", "long"],
+            "default":null
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 17df373..745c934 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -143,7 +143,7 @@ public class SamzaSqlApplicationConfig {
     });
   }
 
-  private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
+  public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
       String pluginDomainFormat, BiFunction<Object, Config, T> factoryInvoker) {
     String pluginDomain = String.format(pluginDomainFormat, plugin);
     Config pluginConfig = staticConfig.subset(pluginDomain);

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 853a7bb..5b9df5c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -27,6 +27,7 @@ include \
   'samza-rest',
   'samza-shell',
   'samza-sql',
+  'samza-sql-shell',
   'samza-tools'
 
 def scalaModules = [


[2/2] samza git commit: SAMZA-1901: Implementation of Samza SQL Shell

Posted by sr...@apache.org.
SAMZA-1901: Implementation of Samza SQL Shell

## What changes were proposed in this pull request?
This PR is to implement Samza SQL shell. The document about the shell was attached [here](https://issues.apache.org/jira/browse/SAMZA-1901).

## How was this patch tested?
1. Add unit tests
2. Run the shell with use cases mentioned in the attached document under https://issues.apache.org/jira/browse/SAMZA-1901

Author: Weiqing Yang <ya...@gmail.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>

Closes #654 from weiqingy/samza-shell


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e203db2a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e203db2a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e203db2a

Branch: refs/heads/master
Commit: e203db2aa29614c15ff0ee1593bf41c79b3b5c42
Parents: fd6f1ed
Author: Weiqing Yang <ya...@gmail.com>
Authored: Mon Oct 22 09:51:40 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Mon Oct 22 09:51:40 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |  28 +
 gradle/dependency-versions.gradle               |   2 +
 .../JobNodeConfigurationGenerator.java          |   6 +-
 samza-sql-shell/conf/samza-sql-shell-log4j.xml  |  49 ++
 samza-sql-shell/conf/shell-defaults.conf        |  28 +
 samza-sql-shell/scripts/samza-sql-shell.sh      |  42 +
 .../apache/samza/sql/client/cli/CliCommand.java |  53 ++
 .../samza/sql/client/cli/CliCommandType.java    |  78 ++
 .../samza/sql/client/cli/CliConstants.java      |  57 ++
 .../samza/sql/client/cli/CliEnvironment.java    | 130 +++
 .../samza/sql/client/cli/CliHighlighter.java    |  89 ++
 .../apache/samza/sql/client/cli/CliShell.java   | 821 +++++++++++++++++++
 .../apache/samza/sql/client/cli/CliView.java    |  29 +
 .../org/apache/samza/sql/client/cli/Main.java   | 117 +++
 .../sql/client/cli/QueryResultLogView.java      | 291 +++++++
 .../sql/client/impl/AvroSqlSchemaConverter.java | 112 +++
 .../client/impl/CliLoggingSystemFactory.java    | 117 +++
 .../FileSystemAvroRelSchemaProviderFactory.java |  75 ++
 .../samza/sql/client/impl/SamzaExecutor.java    | 511 ++++++++++++
 .../sql/client/impl/SamzaSqlFieldType.java      |  94 +++
 .../sql/client/impl/SamzaSqlUdfDisplayInfo.java |  71 ++
 .../sql/client/interfaces/ExecutionContext.java |  41 +
 .../client/interfaces/ExecutionException.java   |  40 +
 .../sql/client/interfaces/ExecutionStatus.java  |  30 +
 .../sql/client/interfaces/NonQueryResult.java   |  62 ++
 .../sql/client/interfaces/QueryResult.java      |  49 ++
 .../sql/client/interfaces/SqlExecutor.java      | 171 ++++
 .../sql/client/interfaces/SqlFunction.java      |  55 ++
 .../samza/sql/client/interfaces/SqlSchema.java  |  56 ++
 .../sql/client/interfaces/SqlSchemaBuilder.java |  63 ++
 .../samza/sql/client/util/CliException.java     |  41 +
 .../apache/samza/sql/client/util/CliUtil.java   |  43 +
 .../sql/client/util/RandomAccessQueue.java      |  96 +++
 .../sql/client/impl/SamzaExecutorTest.java      |  79 ++
 .../sql/client/util/RandomAccessQueueTest.java  |  89 ++
 .../src/test/resources/ProfileChangeStream.avsc |  51 ++
 .../sql/runner/SamzaSqlApplicationConfig.java   |   2 +-
 settings.gradle                                 |   1 +
 38 files changed, 3764 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 856bb1d..b6ee4ac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -336,6 +336,34 @@ project(':samza-sql') {
   }
 }
 
+project(':samza-sql-shell') {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(':samza-sql')
+    compile project(':samza-tools')
+    compile project(":samza-core_$scalaVersion")
+    compile project(':samza-api')
+    compile project(":samza-kafka_$scalaVersion")
+    compile project(':samza-azure')
+    compile "net.java.dev.jna:jna:$jnaVersion"
+    compile "org.jline:jline:$jlineVersion"
+
+    testCompile "junit:junit:$junitVersion"
+  }
+
+  tasks.create(name: "releaseSqlShellTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
+    into "samza-sql-shell-${version}"
+    compression = Compression.GZIP
+    from(project.file("./scripts")) { into "scripts/" }
+    from(project.file("./conf")) { into "conf/" }
+    from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "scripts/" }
+    from(configurations.runtime) { into("lib/") }
+    from(configurations.archives.artifacts.files) { into("lib/") }
+    duplicatesStrategy 'exclude'
+  }
+}
+
 project(':samza-tools') {
   apply plugin: 'java'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a5b4f51..43ceb0a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -47,4 +47,6 @@
   zkClientVersion = "0.8"
   zookeeperVersion = "3.4.6"
   failsafeVersion = "1.1.0"
+  jlineVersion = "3.8.2"
+  jnaVersion = "4.5.1"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 70c9b23..215177b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -70,10 +70,8 @@ import org.slf4j.LoggerFactory;
   static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
     Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
     originalConfig.forEach((k, v) -> {
-        if (generatedConfig.containsKey(k) &&
-            !Objects.equals(generatedConfig.get(k), v)) {
-          LOG.info("Replacing generated config for key: {} value: {} with original config value: {}",
-              k, generatedConfig.get(k), v);
+        if (generatedConfig.containsKey(k) && !Objects.equals(generatedConfig.get(k), v)) {
+          LOG.info("Replacing generated config for key: {} value: {} with original config value: {}", k, generatedConfig.get(k), v);
         }
         mergedConfig.put(k, v);
       });

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/conf/samza-sql-shell-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/samza-sql-shell-log4j.xml b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
new file mode 100644
index 0000000..924ef93
--- /dev/null
+++ b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+    </layout>
+  </appender>
+
+  <appender name="file" class="org.apache.log4j.RollingFileAppender">
+    <param name="append" value="false" />
+    <param name="maxFileSize" value="10MB" />
+    <param name="maxBackupIndex" value="10" />
+    <param name="file" value="${LOG_HOME}/logs/samza-sql-shell.log" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+    </layout>
+  </appender>
+
+  <root>
+    <level value="info" />
+    <appender-ref ref="file" />
+  </root>
+</log4j:configuration>
+

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/conf/shell-defaults.conf
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/shell-defaults.conf b/samza-sql-shell/conf/shell-defaults.conf
new file mode 100644
index 0000000..0c85e52
--- /dev/null
+++ b/samza-sql-shell/conf/shell-defaults.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Default system properties included when running samza-sql-shell.
+# This is useful for setting default environmental settings.
+
+# Example:
+shell.executor = org.apache.samza.sql.client.impl.SamzaExecutor
+samza.sql.output = compact
+# samza.sql.system.kafka.address = localhost:2181
+# samza.sql.relSchemaProvider.config.schemaDir = /tmp/schemas/
+# samza.sql.ioResolver = config
+# samza.sql.udfResolver = config

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/scripts/samza-sql-shell.sh
----------------------------------------------------------------------
diff --git a/samza-sql-shell/scripts/samza-sql-shell.sh b/samza-sql-shell/scripts/samza-sql-shell.sh
new file mode 100755
index 0000000..d9eb6df
--- /dev/null
+++ b/samza-sql-shell/scripts/samza-sql-shell.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+parent_dir="$(dirname "$base_dir")"
+
+CONF_FILE="-conf $parent_dir/conf/shell-defaults.conf"
+
+if [ "x$LOG_HOME" = "x" ]; then
+    export LOG_HOME=$parent_dir
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$parent_dir/conf/samza-sql-shell-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx4G -Xms4G"
+fi
+
+exec $base_dir/run-class.sh $LOG4J_OPTS -DLOG_HOME=$LOG_HOME org.apache.samza.sql.client.cli.Main $CONF_FILE "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
new file mode 100755
index 0000000..7c6480b
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * A shell command containing command name and parameters.
+ */
+class CliCommand {
+  private CliCommandType commandType;
+  private String parameters;
+
+  public CliCommand(CliCommandType cmdType) {
+    this.commandType = cmdType;
+  }
+
+  public CliCommand(CliCommandType cmdType, String parameters) {
+    this(cmdType);
+    this.parameters = parameters;
+  }
+
+  public CliCommandType getCommandType() {
+    return commandType;
+  }
+
+  public String getParameters() {
+    return parameters;
+  }
+
+  public void setParameters(String parameters) {
+    this.parameters = parameters;
+  }
+
+  public String getFullCommand() {
+    return commandType.getCommandName() + CliConstants.SPACE + parameters;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
new file mode 100755
index 0000000..0cd170e
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Enum all the commands we now support along with descriptions.
+ */
+enum CliCommandType {
+  SHOW_TABLES("SHOW TABLES", "Shows all available tables.", "Usage: SHOW TABLES <table name>"),
+  SHOW_FUNCTIONS("SHOW FUNCTIONS", "Shows all available UDFs.", "SHOW FUNCTION"),
+  DESCRIBE("DESCRIBE", "Describes a table.", "Usage: DESCRIBE <table name>"),
+
+  SELECT("SELECT", "\tExecutes a SQL SELECT query.", "SELECT uses a standard streaming SQL syntax."),
+  EXECUTE("EXECUTE", "\tExecute a sql file.", "EXECUTE <URI of a sql file>"),
+  INSERT_INTO("INSERT INTO", "Executes a SQL INSERT INTO.", "INSERT INTO uses a standard streaming SQL syntax."),
+  LS("LS", "\tLists all background executions.", "LS [execution ID]"),
+  STOP("STOP", "\tStops an execution.", "Usage: STOP <execution ID>"),
+  RM("RM", "\tRemoves an execution from the list.", "Usage: RM <execution ID>"),
+
+  HELP("HELP", "\tDisplays this help message.", "Usage: HELP [command]"),
+  SET("SET", "\tSets a variable.", "Usage: SET VAR=VAL"),
+  CLEAR("CLEAR", "\tClears the screen.", "CLEAR"),
+  EXIT("EXIT", "\tExits the shell.", "Exit"),
+  QUIT("QUIT", "\tQuits the shell.", "QUIT"),
+
+  INVALID_COMMAND("INVALID_COMMAND", "INVALID_COMMAND", "INVALID_COMMAND");
+
+  private final String cmdName;
+  private final String description;
+  private final String usage;
+
+  CliCommandType(String cmdName, String description, String usage) {
+    this.cmdName = cmdName;
+    this.description = description;
+    this.usage = usage;
+  }
+
+  public static List<String> getAllCommands() {
+    List<String> cmds = new ArrayList<String>();
+    for (CliCommandType t : CliCommandType.values()) {
+      if (t != INVALID_COMMAND)
+        cmds.add(t.getCommandName());
+    }
+    return cmds;
+  }
+
+  public String getCommandName() {
+    return cmdName;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public String getUsage() {
+    return usage;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
new file mode 100755
index 0000000..286dc79
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * Constant definitions for the shell.
+ */
+class CliConstants {
+  public static final String APP_NAME = "Samza SQL Shell";
+  public static final String WINDOW_TITLE = "Samza SQL Shell";
+  public static final String PROMPT_1ST = "Samza SQL";
+  public static final String PROMPT_1ST_END = "> ";
+
+  // All shell environment variables starts with the prefix
+  public static final String CONFIG_SHELL_PREFIX = "shell.";
+  // Specifies the executor used by the shell
+  public static final String CONFIG_EXECUTOR = "shell.executor";
+
+  public static final String VERSION = "0.0.1";
+
+
+  public static final String WELCOME_MESSAGE;
+  static {
+        WELCOME_MESSAGE =
+"      ___           ___           ___           ___           ___ \n" +
+"     /  /\\         /  /\\         /  /\\         /__/\\         /  /\\ \n" +
+"    /  /::\\       /  /::\\       /  /::|        \\  \\:\\       /  /::\\ \n"+
+"   /__/:/\\:\\     /  /:/\\:\\     /  /:|:|         \\  \\:\\     /  /:/\\:\\ \n"+
+"  _\\_ \\:\\ \\:\\   /  /::\\ \\:\\   /  /:/|:|__        \\  \\:\\   /  /::\\ \\:\\ \n"+
+" /__/\\ \\:\\ \\:\\ /__/:/\\:\\_\\:\\ /__/:/_|::::\\  ______\\__\\:\\ /__/:/\\:\\_\\:\\ \n"+
+" \\  \\:\\ \\:\\_\\/ \\__\\/  \\:\\/:/ \\__\\/  /~~/:/ \\  \\::::::::/ \\__\\/  \\:\\/:/ \n"+
+"  \\  \\:\\_\\:\\        \\__\\::/        /  /:/   \\  \\:\\~~~~~       \\__\\::/ \n"+
+"   \\  \\:\\/:/        /  /:/        /  /:/     \\  \\:\\           /  /:/ \n"+
+"    \\  \\::/        /__/:/        /__/:/       \\  \\:\\         /__/:/ \n"+
+"     \\__\\/         \\__\\/         \\__\\/         \\__\\/         \\__\\/  \n\n"+
+"Welcome to Samza SQL shell (V" + VERSION + "). Enter HELP for all commands.\n\n";
+  }
+
+  public static final char SPACE = '\u0020';
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
new file mode 100644
index 0000000..9384169
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CliEnvironment contains "environment variables" that configures the shell behavior.
+ */
+public class CliEnvironment {
+  private static final String debugEnvVar = "shell.debug";
+  private static PrintStream stdout = System.out;
+  private static PrintStream stderr = System.err;
+  private Boolean debug = false;
+
+  boolean isDebug() {
+    return debug;
+  }
+
+  void setDebug(Boolean debug) {
+    this.debug = debug;
+  }
+
+  /**
+   * @param var Environment variable
+   * @param val Value of the environment variable
+   * @return 0 : succeed
+   * -1: invalid var
+   * -2: invalid val
+   */
+  int setEnvironmentVariable(String var, String val) {
+    switch (var.toLowerCase()) {
+      case debugEnvVar:
+        val = val.toLowerCase();
+        if (val.equals("true")) {
+          debug = true;
+          enableJavaSystemOutAndErr();
+        } else if (val.equals("false")) {
+          debug = false;
+          disableJavaSystemOutAndErr();
+        } else
+          return -2;
+        break;
+      default:
+        return -1;
+    }
+
+    return 0;
+  }
+
+  // TODO: Separate the values out of the logic part
+  List<String> getPossibleValues(String var) {
+    List<String> vals = new ArrayList<>();
+    switch (var.toLowerCase()) {
+      case debugEnvVar:
+        vals.add("true");
+        vals.add("false");
+        return vals;
+      default:
+        return null;
+    }
+  }
+
+  void printAll(Writer writer) throws IOException {
+    writer.write(debugEnvVar);
+    writer.write('=');
+    writer.write(debug.toString());
+    writer.write('\n');
+  }
+
+  private void disableJavaSystemOutAndErr() {
+    PrintStream ps = new PrintStream(new NullOutputStream());
+    System.setOut(ps);
+    System.setErr(ps);
+  }
+
+  private void enableJavaSystemOutAndErr() {
+    System.setOut(stdout);
+    System.setErr(stderr);
+  }
+
+  void takeEffect() {
+    if (debug) {
+      enableJavaSystemOutAndErr();
+    } else {
+      // We control terminal directly; Forbid any Java System.out and System.err stuff so
+      // any underlying output will not mess up the console
+      disableJavaSystemOutAndErr();
+    }
+  }
+
+  private class NullOutputStream extends OutputStream {
+    public void close() {
+    }
+
+    public void flush() {
+    }
+
+    public void write(byte[] b) {
+    }
+
+    public void write(byte[] b, int off, int len) {
+    }
+
+    public void write(int b) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
new file mode 100755
index 0000000..c01ed5c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.Highlighter;
+import org.jline.reader.LineReader;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A primitive highlighter.
+ */
+public class CliHighlighter implements Highlighter {
+  private static final List<String> keywords;
+
+  static {
+    keywords = CliCommandType.getAllCommands();
+    keywords.add("FROM");
+    keywords.add("WHERE");
+  }
+
+  private static List<String> splitWithSpace(String buffer) {
+    List<String> list = new ArrayList<String>();
+    if (CliUtil.isNullOrEmpty(buffer))
+      return list;
+
+    boolean prevIsSpace = Character.isSpaceChar(buffer.charAt(0));
+    int prevPos = 0;
+    for (int i = 1; i < buffer.length(); ++i) {
+      char c = buffer.charAt(i);
+      boolean isSpace = Character.isSpaceChar(c);
+      if (isSpace != prevIsSpace) {
+        list.add(buffer.substring(prevPos, i));
+        prevPos = i;
+        prevIsSpace = isSpace;
+      }
+    }
+    list.add(buffer.substring(prevPos));
+    return list;
+  }
+
+  public AttributedString highlight(LineReader reader, String buffer) {
+    AttributedStringBuilder builder = new AttributedStringBuilder();
+    List<String> tokens = splitWithSpace(buffer);
+
+    for (String token : tokens) {
+      if (isKeyword(token)) {
+        builder.style(AttributedStyle.BOLD.foreground(AttributedStyle.YELLOW))
+                .append(token);
+      } else {
+        builder.style(AttributedStyle.DEFAULT)
+                .append(token);
+      }
+    }
+
+    return builder.toAttributedString();
+  }
+
+  private boolean isKeyword(String token) {
+    for (String keyword : keywords) {
+      if (keyword.compareToIgnoreCase(token) == 0)
+        return true;
+    }
+    return false;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
new file mode 100755
index 0000000..54c7bf6
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.completer.StringsCompleter;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+
+/**
+ * The shell UI.
+ */
+class CliShell {
+  private final Terminal terminal;
+  private final PrintWriter writer;
+  private final LineReader lineReader;
+  private final String firstPrompt;
+  private final SqlExecutor executor;
+  private final ExecutionContext exeContext;
+  private CliEnvironment env;
+  private boolean keepRunning = true;
+  private Map<Integer, String> executions = new TreeMap<>();
+
+  public CliShell(SqlExecutor executor, CliEnvironment environment, ExecutionContext execContext) {
+    if (executor == null || environment == null || execContext == null) {
+      throw new IllegalArgumentException();
+    }
+
+    // Terminal
+    try {
+      terminal = TerminalBuilder.builder()
+              .name(CliConstants.WINDOW_TITLE)
+              .build();
+    } catch (IOException e) {
+      throw new CliException("Error when creating terminal", e);
+    }
+
+    // Terminal writer
+    writer = terminal.writer();
+
+    // LineReader
+    final DefaultParser parser = new DefaultParser()
+            .eofOnEscapedNewLine(true)
+            .eofOnUnclosedQuote(true);
+    lineReader = LineReaderBuilder.builder()
+            .appName(CliConstants.APP_NAME)
+            .terminal(terminal)
+            .parser(parser)
+            .highlighter(new CliHighlighter())
+            .completer(new StringsCompleter(CliCommandType.getAllCommands()))
+            .build();
+
+    // Command Prompt
+    firstPrompt = new AttributedStringBuilder()
+            .style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
+            .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
+            .toAnsi();
+
+    // Execution context and executor
+    env = environment;
+    env.takeEffect();
+    exeContext = execContext;
+    this.executor = executor;
+    this.executor.start(exeContext);
+  }
+
+  Terminal getTerminal() {
+    return terminal;
+  }
+
+  CliEnvironment getEnvironment() {
+    return env;
+  }
+
+  SqlExecutor getExecutor() {
+    return executor;
+  }
+
+  ExecutionContext getExeContext() {
+    return exeContext;
+  }
+
+  /**
+   * Actually run the shell. Does not return until user choose to exit.
+   */
+  public void open() {
+    // Remember we cannot enter alternate screen mode here as there is only one alternate
+    // screen and we need it to show streaming results. Clear the screen instead.
+    clearScreen();
+    writer.write(CliConstants.WELCOME_MESSAGE);
+
+    try {
+      // Check if jna.jar exists in class path
+      try {
+        ClassLoader.getSystemClassLoader().loadClass("com.sun.jna.NativeLibrary");
+      } catch (ClassNotFoundException e) {
+        // Something's wrong. It could be a dumb terminal if neither jna nor jansi lib is there
+        writer.write("Warning: jna.jar does NOT exist. It may lead to a dumb shell or a performance hit.\n");
+      }
+
+      while (keepRunning) {
+        String line;
+        try {
+          line = lineReader.readLine(firstPrompt);
+        } catch (UserInterruptException e) {
+          continue;
+        } catch (EndOfFileException e) {
+          commandQuit();
+          break;
+        }
+
+        if (!CliUtil.isNullOrEmpty(line)) {
+          CliCommand command = parseLine(line);
+          if (command == null)
+            continue;
+
+          switch (command.getCommandType()) {
+            case CLEAR:
+              commandClear();
+              break;
+
+            case DESCRIBE:
+              commandDescribe(command);
+              break;
+
+            case EXECUTE:
+              commandExecuteFile(command);
+              break;
+
+            case EXIT:
+            case QUIT:
+              commandQuit();
+              break;
+
+            case HELP:
+              commandHelp(command);
+              break;
+
+            case INSERT_INTO:
+              commandInsertInto(command);
+              break;
+
+            case LS:
+              commandLs(command);
+              break;
+
+            case RM:
+              commandRm(command);
+              break;
+
+            case SELECT:
+              commandSelect(command);
+              break;
+
+            case SET:
+              commandSet(command);
+              break;
+
+            case SHOW_FUNCTIONS:
+              commandShowFunctions(command);
+              break;
+
+            case SHOW_TABLES:
+              commandShowTables(command);
+              break;
+
+            case STOP:
+              commandStop(command);
+              break;
+
+            case INVALID_COMMAND:
+              printHelpMessage();
+              break;
+
+            default:
+              writer.write("UNDER DEVELOPEMENT. Command:" + command.getCommandType() + "\n");
+              writer.write("Parameters:" +
+                      (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" : command.getParameters())
+                      + "\n\n");
+              writer.flush();
+          }
+        }
+      }
+    } catch (Exception e) {
+      writer.print(e.getClass().getSimpleName());
+      writer.print(". ");
+      writer.println(e.getMessage());
+      e.printStackTrace(writer);
+      writer.println();
+      writer.println("We are sorry but SamzaSqlShell has encountered a problem and needs to stop.");
+    }
+
+    writer.write("Cleaning up... ");
+    writer.flush();
+    executor.stop(exeContext);
+
+    writer.write("Done.\nBye.\n\n");
+    writer.flush();
+
+    try {
+      terminal.close();
+    } catch (IOException e) {
+      // Doesn't matter
+    }
+  }
+
+  private void commandClear() {
+    clearScreen();
+  }
+
+  private void commandDescribe(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    SqlSchema schema = executor.getTableSchema(exeContext, parameters);
+
+    if (schema == null) {
+      writer.println("Failed to get schema. Error: " + executor.getErrorMsg());
+    } else {
+      writer.println();
+      List<String> lines = formatSchema4Display(schema);
+      for (String line : lines) {
+        writer.println(line);
+      }
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandSet(CliCommand command) {
+    String param = command.getParameters();
+    if (CliUtil.isNullOrEmpty(param)) {
+      try {
+        env.printAll(writer);
+      } catch (IOException e) {
+        e.printStackTrace(writer);
+      }
+      writer.println();
+      writer.flush();
+      return;
+    }
+    String[] params = param.split("=");
+    if (params.length != 2) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    int ret = env.setEnvironmentVariable(params[0], params[1]);
+    if (ret == 0) {
+      writer.print(params[0]);
+      writer.print(" set to ");
+      writer.println(params[1]);
+    } else if (ret == -1) {
+      writer.print("Unknow variable: ");
+      writer.println(params[0]);
+    } else if (ret == -2) {
+      writer.print("Invalid value: ");
+      writer.println(params[1]);
+      List<String> vals = env.getPossibleValues(params[0]);
+      writer.print("Possible values:");
+      for (String s : vals) {
+        writer.print(CliConstants.SPACE);
+        writer.print(s);
+      }
+      writer.println();
+    }
+
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandExecuteFile(CliCommand command) {
+    String fullCmdStr = command.getFullCommand();
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println("Usage: execute <fileuri>\n");
+      writer.flush();
+      return;
+    }
+    URI uri = null;
+    boolean valid = false;
+    File file = null;
+    try {
+      uri = new URI(parameters);
+      file = new File(uri.getPath());
+      valid = file.exists() && !file.isDirectory();
+    } catch (URISyntaxException e) {
+    }
+    if (!valid) {
+      writer.println("Invalid URI.\n");
+      writer.flush();
+      return;
+    }
+
+    NonQueryResult nonQueryResult = executor.executeNonQuery(exeContext, file);
+    if (!nonQueryResult.succeeded()) {
+      writer.print("Execution error: ");
+      writer.println(executor.getErrorMsg());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    executions.put(nonQueryResult.getExecutionId(), fullCmdStr);
+    List<String> submittedStmts = nonQueryResult.getSubmittedStmts();
+    List<String> nonsubmittedStmts = nonQueryResult.getNonSubmittedStmts();
+
+    writer.println("Sql file submitted. Execution ID: " + nonQueryResult.getExecutionId());
+    writer.println("Submitted statements: \n");
+    if (submittedStmts == null || submittedStmts.size() == 0) {
+      writer.println("\tNone.");
+    } else {
+      for (String statement : submittedStmts) {
+        writer.print("\t");
+        writer.println(statement);
+      }
+      writer.println();
+    }
+
+    if (nonsubmittedStmts != null && nonsubmittedStmts.size() != 0) {
+      writer.println("Statements NOT submitted: \n");
+      for (String statement : nonsubmittedStmts) {
+        writer.print("\t");
+        writer.println(statement);
+      }
+      writer.println();
+    }
+
+    writer.println("Note: All query statements in a sql file are NOT submitted.");
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandInsertInto(CliCommand command) {
+    String fullCmdStr = command.getFullCommand();
+    NonQueryResult result = executor.executeNonQuery(exeContext,
+            Collections.singletonList(fullCmdStr));
+
+    if (result.succeeded()) {
+      writer.print("Execution submitted successfully. Id: ");
+      writer.println(String.valueOf(result.getExecutionId()));
+      executions.put(result.getExecutionId(), fullCmdStr);
+    } else {
+      writer.write("Execution failed to submit. Error: ");
+      writer.println(executor.getErrorMsg());
+    }
+
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandLs(CliCommand command) {
+    List<Integer> execIds = new ArrayList<>();
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      execIds.addAll(executions.keySet());
+    } else {
+      String[] params = parameters.split("\u0020");
+      for (String param : params) {
+        Integer id = null;
+        try {
+          id = Integer.valueOf(param);
+        } catch (NumberFormatException e) {
+        }
+        if (id != null && executions.containsKey(id)) {
+          execIds.add(id);
+        }
+      }
+    }
+    if (execIds.size() == 0) {
+      writer.println();
+      return;
+    }
+
+    execIds.sort(Integer::compareTo);
+
+    final int terminalWidth = terminal.getWidth();
+    final int ID_WIDTH = 3;
+    final int STATUS_WIDTH = 20;
+    final int CMD_WIDTH = terminalWidth - ID_WIDTH - STATUS_WIDTH - 4;
+
+    AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+    AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+    for (int i = 0; i < execIds.size(); ++i) {
+      Integer id = execIds.get(i);
+      String cmd = executions.get(id);
+      if (cmd == null)
+        continue;
+
+      String status = "UNKNOWN";
+      try {
+        ExecutionStatus execStatus = executor.queryExecutionStatus(id);
+        if (execStatus != null)
+          status = execStatus.name();
+      } catch (ExecutionException e) {
+      }
+
+      int cmdStartIdx = 0;
+      int cmdLength = cmd.length();
+      StringBuilder line;
+      while (cmdStartIdx < cmdLength) {
+        line = new StringBuilder(terminalWidth);
+        if (cmdStartIdx == 0) {
+          line.append(CliConstants.SPACE);
+          line.append(id);
+          CliUtil.appendTo(line, 1 + ID_WIDTH + 1, CliConstants.SPACE);
+          line.append(status);
+        }
+        CliUtil.appendTo(line, 1 + ID_WIDTH + 1 + STATUS_WIDTH + 1, CliConstants.SPACE);
+
+        int numToWrite = Math.min(CMD_WIDTH, cmdLength - cmdStartIdx);
+        if (numToWrite > 0) {
+          line.append(cmd, cmdStartIdx, cmdStartIdx + numToWrite);
+          cmdStartIdx += numToWrite;
+        }
+
+        if (i % 2 == 0) {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+          attrBuilder.append(line.toString());
+          writer.println(attrBuilder.toAnsi());
+        } else {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+          attrBuilder.append(line.toString());
+          writer.println(attrBuilder.toAnsi());
+        }
+      }
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandRm(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    List<Integer> execIds = new ArrayList<>();
+    String[] params = parameters.split("\u0020");
+    for (String param : params) {
+      Integer id = null;
+      try {
+        id = Integer.valueOf(param);
+      } catch (NumberFormatException e) {
+      }
+      if (id == null || !executions.containsKey(id)) {
+        writer.print("Error: ");
+        writer.print(param);
+        writer.println(" is not a valid id.");
+      } else {
+        execIds.add(id);
+      }
+    }
+
+    for (Integer id : execIds) {
+      ExecutionStatus status = null;
+      try {
+        status = executor.queryExecutionStatus(id);
+      } catch (ExecutionException e) {
+      }
+      if (status == null) {
+        writer.println(String.format("Error: failed to get execution status for %d. %s",
+                id, executor.getErrorMsg()));
+        continue;
+      }
+      if (status == ExecutionStatus.Running) {
+        writer.println(String.format("Execution %d is still running. Stop it first.", id));
+        continue;
+      }
+      if (executor.removeExecution(exeContext, id)) {
+        writer.println(String.format("Execution %d was removed.", id));
+        executions.remove(id);
+      } else {
+        writer.println(String.format("Error: failed to remove execution %d. %s",
+                id, executor.getErrorMsg()));
+      }
+
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandQuit() {
+    keepRunning = false;
+  }
+
+  private void commandSelect(CliCommand command) {
+    QueryResult queryResult = executor.executeQuery(exeContext, command.getFullCommand());
+
+    if (queryResult.succeeded()) {
+      CliView view = new QueryResultLogView();
+      view.open(this, queryResult);
+      executor.stopExecution(exeContext, queryResult.getExecutionId());
+    } else {
+      writer.write("Execution failed. Error: ");
+      writer.println(executor.getErrorMsg());
+      writer.println();
+      writer.flush();
+    }
+  }
+
+  private void commandShowTables(CliCommand command) {
+    List<String> tableNames = executor.listTables(exeContext);
+
+    if (tableNames != null) {
+      for (String tableName : tableNames) {
+        writer.println(tableName);
+      }
+    } else {
+      writer.print("Failed to list tables. Error: ");
+      writer.println(executor.getErrorMsg());
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandShowFunctions(CliCommand command) {
+    List<SqlFunction> fns = executor.listFunctions(exeContext);
+
+    if (fns != null) {
+      for (SqlFunction fn : fns) {
+        writer.println(fn.toString());
+      }
+    } else {
+      writer.print("Failed to list functions. Error: ");
+      writer.println(executor.getErrorMsg());
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandStop(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      writer.println(command.getCommandType().getUsage());
+      writer.println();
+      writer.flush();
+      return;
+    }
+
+    List<Integer> execIds = new ArrayList<>();
+    String[] params = parameters.split("\u0020");
+    for (String param : params) {
+      Integer id = null;
+      try {
+        id = Integer.valueOf(param);
+      } catch (NumberFormatException e) {
+      }
+      if (id == null || !executions.containsKey(id)) {
+        writer.print("Error: ");
+        writer.print(param);
+        writer.println(" is not a valid id.");
+      } else {
+        execIds.add(id);
+      }
+    }
+
+    for (Integer id : execIds) {
+      if (executor.stopExecution(exeContext, id)) {
+        writer.println(String.format("Request to stop execution %d was sent.", id));
+      } else {
+        writer.println(String.format("Failed to stop %d: %s", id, executor.getErrorMsg()));
+      }
+    }
+    writer.println();
+    writer.flush();
+  }
+
+  private void commandHelp(CliCommand command) {
+    String parameters = command.getParameters();
+    if (CliUtil.isNullOrEmpty(parameters)) {
+      printHelpMessage();
+      return;
+    }
+
+    parameters = parameters.trim().toUpperCase();
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      String cmdText = cmdType.getCommandName();
+      if (cmdText.equals(parameters)) {
+        writer.println(cmdType.getUsage());
+        writer.println();
+        writer.flush();
+        return;
+      }
+    }
+
+    writer.print("Unknown command: ");
+    writer.println(parameters);
+    writer.println();
+    writer.flush();
+  }
+
+
+  private CliCommand parseLine(String line) {
+    line = trimCommand(line);
+    if (CliUtil.isNullOrEmpty(line))
+      return null;
+
+    String upperCaseLine = line.toUpperCase();
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      String cmdText = cmdType.getCommandName();
+      if (upperCaseLine.startsWith(cmdText)) {
+        if (upperCaseLine.length() == cmdText.length())
+          return new CliCommand(cmdType);
+        else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) {
+          String parameter = line.substring(cmdText.length()).trim();
+          if (!parameter.isEmpty())
+            return new CliCommand(cmdType, parameter);
+        }
+      }
+    }
+    return new CliCommand(CliCommandType.INVALID_COMMAND);
+  }
+
+  private void printHelpMessage() {
+    writer.println();
+    AttributedStringBuilder builder = new AttributedStringBuilder();
+    builder.append("The following commands are supported by ")
+            .append(CliConstants.APP_NAME)
+            .append(" at the moment.\n\n");
+
+    for (CliCommandType cmdType : CliCommandType.values()) {
+      if (cmdType == CliCommandType.INVALID_COMMAND)
+        continue;
+
+      String cmdText = cmdType.getCommandName();
+      String cmdDescription = cmdType.getDescription();
+
+      builder.style(AttributedStyle.DEFAULT.bold())
+              .append(cmdText)
+              .append("\t\t")
+              .style(AttributedStyle.DEFAULT)
+              .append(cmdDescription)
+              .append("\n");
+    }
+
+    writer.println(builder.toAnsi());
+    writer.println("HELP <COMMAND> to get help for a specific command.\n");
+    writer.flush();
+  }
+
+  private void clearScreen() {
+    terminal.puts(InfoCmp.Capability.clear_screen);
+  }
+
+  /*
+      Field    | Type
+      -------------------------
+      Field1   | Type 1
+      Field2   | VARCHAR(STRING)
+      Field... | VARCHAR(STRING)
+      -------------------------
+  */
+  private List<String> formatSchema4Display(SqlSchema schema) {
+    final String HEADER_FIELD = "Field";
+    final String HEADER_TYPE = "Type";
+    final char SEPERATOR = '|';
+    final char LINE_SEP = '-';
+
+    int terminalWidth = terminal.getWidth();
+    // Two spaces * 2 plus one SEPERATOR
+    if (terminalWidth < 2 + 2 + 1 + HEADER_FIELD.length() + HEADER_TYPE.length()) {
+      return Collections.singletonList("Not enough room.");
+    }
+
+    // Find the best seperator position for least rows
+    int seperatorPos = HEADER_FIELD.length() + 2;
+    int minRowNeeded = Integer.MAX_VALUE;
+    int longestLineCharNum = 0;
+    int rowCount = schema.getFieldCount();
+    for (int j = seperatorPos; j < terminalWidth - HEADER_TYPE.length() - 2; ++j) {
+      boolean fieldWrapped = false;
+      int rowNeeded = 0;
+      for (int i = 0; i < rowCount; ++i) {
+        int fieldLen = schema.getFieldName(i).length();
+        int typeLen = schema.getFieldTypeName(i).length();
+        int fieldRowNeeded = CliUtil.ceilingDiv(fieldLen, j - 2);
+        int typeRowNeeded = CliUtil.ceilingDiv(typeLen, terminalWidth - 1 - j - 2);
+
+        rowNeeded += Math.max(fieldRowNeeded, typeRowNeeded);
+        fieldWrapped |= fieldRowNeeded > 1;
+        if (typeRowNeeded > 1) {
+          longestLineCharNum = terminalWidth;
+        } else {
+          longestLineCharNum = Math.max(longestLineCharNum, j + typeLen + 2 + 1);
+        }
+      }
+      if (rowNeeded < minRowNeeded) {
+        minRowNeeded = rowNeeded;
+        seperatorPos = j;
+      }
+      if (!fieldWrapped)
+        break;
+    }
+
+    List<String> lines = new ArrayList<>(minRowNeeded + 4);
+
+    // Header
+    StringBuilder line = new StringBuilder(terminalWidth);
+    line.append(CliConstants.SPACE);
+    line.append(HEADER_FIELD);
+    CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+    line.append(SEPERATOR);
+    line.append(CliConstants.SPACE);
+    line.append(HEADER_TYPE);
+    lines.add(line.toString());
+    line = new StringBuilder(terminalWidth);
+    CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+    lines.add(line.toString());
+
+    // Body
+    AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+    AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+
+    final int fieldColSize = seperatorPos - 2;
+    final int typeColSize = terminalWidth - seperatorPos - 1 - 2;
+    for (int i = 0; i < rowCount; ++i) {
+      String field = schema.getFieldName(i);
+      String type = schema.getFieldTypeName(i);
+      int fieldLen = field.length();
+      int typeLen = type.length();
+      int fieldStartIdx = 0, typeStartIdx = 0;
+      while (fieldStartIdx < fieldLen || typeStartIdx < typeLen) {
+        line = new StringBuilder(terminalWidth);
+        line.append(CliConstants.SPACE);
+        int numToWrite = Math.min(fieldColSize, fieldLen - fieldStartIdx);
+        if (numToWrite > 0) {
+          line.append(field, fieldStartIdx, fieldStartIdx + numToWrite);
+          fieldStartIdx += numToWrite;
+        }
+        CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+        line.append(SEPERATOR);
+        line.append(CliConstants.SPACE);
+
+        numToWrite = Math.min(typeColSize, typeLen - typeStartIdx);
+        if (numToWrite > 0) {
+          line.append(type, typeStartIdx, typeStartIdx + numToWrite);
+          typeStartIdx += numToWrite;
+        }
+
+        if (i % 2 == 0) {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+          attrBuilder.append(line.toString());
+          lines.add(attrBuilder.toAnsi());
+        } else {
+          AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+          attrBuilder.append(line.toString());
+          lines.add(attrBuilder.toAnsi());
+        }
+      }
+    }
+
+    // Footer
+    line = new StringBuilder(terminalWidth);
+    CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+    lines.add(line.toString());
+    return lines;
+  }
+
+  // Trims: leading spaces; trailing spaces and ";"s
+  private String trimCommand(String command) {
+    if (CliUtil.isNullOrEmpty(command))
+      return command;
+
+    int len = command.length();
+    int st = 0;
+
+    while ((st < len) && (command.charAt(st) <= ' ')) {
+      st++;
+    }
+    while ((st < len) && ((command.charAt(len - 1) <= ' ')
+            || command.charAt(len - 1) == ';')) {
+      len--;
+    }
+    return ((st > 0) || (len < command.length())) ? command.substring(st, len) : command;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
new file mode 100644
index 0000000..de82100
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.QueryResult;
+
+/**
+ * For displaying the streaming result of a SELECT statement.
+ */
+public interface CliView {
+  public void open(CliShell shell, QueryResult queryResult);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
new file mode 100755
index 0000000..d85cdae
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.sql.client.impl.SamzaExecutor;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry of the program.
+ */
+public class Main {
+    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
+
+    public static void main(String[] args) {
+      // Get configuration file path
+      String configFilePath = null;
+      for(int i = 0; i < args.length; ++i) {
+        switch(args[i]) {
+          case "-conf":
+            if(i + 1 < args.length) {
+              configFilePath = args[i + 1];
+              i++;
+            }
+            break;
+          default:
+            LOG.warn("Unknown parameter {}", args[i]);
+            break;
+        }
+      }
+
+      SqlExecutor executor = null;
+      CliEnvironment environment = new CliEnvironment();
+      Map<String, String> executorConfig = new HashMap<>();
+
+      if(!CliUtil.isNullOrEmpty(configFilePath)) {
+        LOG.info("Configuration file path is: {}", configFilePath);
+        try {
+          FileReader fileReader = new FileReader(configFilePath);
+          BufferedReader bufferedReader = new BufferedReader(fileReader);
+          String line;
+          while ((line = bufferedReader.readLine()) != null) {
+            if (line.startsWith("#") || line.startsWith("[")) {
+              continue;
+            }
+            String[] strs = line.split("=");
+            if (strs.length != 2) {
+              continue;
+            }
+            String key = strs[0].trim().toLowerCase();
+            String value = strs[1].trim();
+            if(key.startsWith(CliConstants.CONFIG_SHELL_PREFIX)) {
+              if(key.equals(CliConstants.CONFIG_EXECUTOR)) {
+                try {
+                  Class<?> clazz = Class.forName(value);
+                  Constructor<?> ctor = clazz.getConstructor();
+                  executor = (SqlExecutor) ctor.newInstance();
+                  LOG.info("Sql executor creation succeed. Executor class is: {}", value);
+                } catch (ClassNotFoundException | NoSuchMethodException
+                    | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+                  throw new CliException(String.format("Failed to create executor %s.", value), e);
+                }
+                continue;
+              }
+
+              // Suppose a shell variable.
+              int result = environment.setEnvironmentVariable(key, value);
+              if(result == -1) { // CliEnvironment doesn't recognize the key.
+                LOG.warn("Unknowing shell environment variable: {}", key);
+              } else if(result == -2) { // Invalid value
+                LOG.warn("Unknowing shell environment value: {}", value);
+              }
+            } else {
+              executorConfig.put(key, value);
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("Error in opening and reading the configuration file {}", e.toString());
+        }
+      }
+      if(executor == null) {
+        executor = new SamzaExecutor();
+      }
+
+      CliShell shell = new CliShell(executor, environment, new ExecutionContext(executorConfig));
+      shell.open();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
new file mode 100644
index 0000000..b1973bc
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.QueryResult;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.jline.keymap.BindingReader;
+import org.jline.keymap.KeyMap;
+import org.jline.terminal.Attributes;
+import org.jline.terminal.Terminal;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.jline.keymap.KeyMap.ctrl;
+
+
+/**
+ * A scrolling (logging) view of the query result of a streaming SELECT statement.
+ */
+
+public class QueryResultLogView implements CliView {
+  private static final int DEFAULT_REFRESH_INTERVAL = 100; // all intervals are in ms
+
+  private int refreshInterval = DEFAULT_REFRESH_INTERVAL;
+  private int height;
+  private Terminal terminal;
+  private SqlExecutor executor;
+  private ExecutionContext exeContext;
+  private volatile boolean keepRunning = true;
+  private boolean paused = false;
+
+  // Stupid BindingReader doesn't have a real nonblocking mode
+  // Must create a new thread to get user input
+  private Thread inputThread;
+  private BindingReader keyReader;
+
+  public QueryResultLogView() {
+  }
+
+  // -- implementation of CliView -------------------------------------------
+
+  public void open(CliShell shell, QueryResult queryResult) {
+    terminal = shell.getTerminal();
+    executor = shell.getExecutor();
+    exeContext = shell.getExeContext();
+
+    TerminalStatus prevStatus = setupTerminal();
+    try {
+      keyReader = new BindingReader(terminal.reader());
+      inputThread = new InputThread();
+      inputThread.start();
+      while (keepRunning) {
+        try {
+          display();
+          if (keepRunning)
+            Thread.sleep(refreshInterval);
+        } catch (InterruptedException e) {
+          continue;
+        }
+      }
+
+      try {
+        inputThread.join(1 * 1000);
+      } catch (InterruptedException e) {
+      }
+    } finally {
+      restoreTerminal(prevStatus);
+    }
+    if (inputThread.isAlive()) {
+      terminal.writer().println("Warning: input thread hang. Have to kill!");
+      terminal.writer().flush();
+      inputThread.interrupt();
+    }
+  }
+
+  // ------------------------------------------------------------------------
+
+  private void display() {
+    updateTerminalSize();
+    int rowsInBuffer = executor.getRowCount();
+    if (rowsInBuffer <= 0 || paused) {
+      clearStatusBar();
+      drawStatusBar(rowsInBuffer);
+      return;
+    }
+
+    while (rowsInBuffer > 0) {
+      clearStatusBar();
+      int step = 10;
+      List<String[]> lines = executor.consumeQueryResult(exeContext, 0, step - 1);
+      for (String[] line : lines) {
+        for (int i = 0; i < line.length; ++i) {
+          terminal.writer().write(line[i] == null ? "null" : line[i]);
+          terminal.writer().write(i == line.length - 1 ? "\n" : " ");
+        }
+      }
+      terminal.flush();
+      clearStatusBar();
+      drawStatusBar(rowsInBuffer);
+
+      if (!keepRunning || paused)
+        return;
+
+      rowsInBuffer = executor.getRowCount();
+    }
+  }
+
+  private void clearStatusBar() {
+    terminal.puts(InfoCmp.Capability.save_cursor);
+    terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+    terminal.puts(InfoCmp.Capability.delete_line, height - 1, 0);
+    terminal.puts(InfoCmp.Capability.restore_cursor);
+  }
+
+  private void drawStatusBar(int rowsInBuffer) {
+    terminal.puts(InfoCmp.Capability.save_cursor);
+    terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+    AttributedStyle statusBarStyle = AttributedStyle.DEFAULT.background(AttributedStyle.WHITE)
+            .foreground(AttributedStyle.BLACK);
+    AttributedStringBuilder attrBuilder = new AttributedStringBuilder()
+            .style(statusBarStyle.bold().italic())
+            .append("Q")
+            .style(statusBarStyle)
+            .append(": Quit     ")
+            .style(statusBarStyle.bold().italic())
+            .append("SPACE")
+            .style(statusBarStyle)
+            .append(": Pause/Resume     ")
+            .append(String.valueOf(rowsInBuffer) + " rows in buffer     ");
+    if (paused) {
+      attrBuilder.style(statusBarStyle.bold().foreground(AttributedStyle.RED).blink())
+              .append("PAUSED");
+    }
+    String statusBarText = attrBuilder.toAnsi();
+    terminal.writer().print(statusBarText);
+    terminal.flush();
+    terminal.puts(InfoCmp.Capability.restore_cursor);
+  }
+
+  private TerminalStatus setupTerminal() {
+    TerminalStatus prevStatus = new TerminalStatus();
+
+    // Signal handlers
+    prevStatus.handler_INT = terminal.handle(Terminal.Signal.INT, this::handleSignal);
+    prevStatus.handler_QUIT = terminal.handle(Terminal.Signal.QUIT, this::handleSignal);
+    prevStatus.handler_TSTP = terminal.handle(Terminal.Signal.TSTP, this::handleSignal);
+    prevStatus.handler_CONT = terminal.handle(Terminal.Signal.CONT, this::handleSignal);
+    prevStatus.handler_WINCH = terminal.handle(Terminal.Signal.WINCH, this::handleSignal);
+
+    // Attributes
+    prevStatus.attributes = terminal.getAttributes();
+    Attributes newAttributes = new Attributes(prevStatus.attributes);
+    // (003, ETX, Ctrl-C, or also 0177, DEL, rubout) Interrupt char‐
+    // acter (INTR).  Send a SIGINT signal.  Recognized when ISIG is
+    // set, and then not passed as input.
+    newAttributes.setControlChar(Attributes.ControlChar.VINTR, 0);
+    // (034, FS, Ctrl-\) Quit character (QUIT).  Send SIGQUIT signal.
+    // Recognized when ISIG is set, and then not passed as input.
+    // newAttributes.setControlChar(Attributes.ControlChar.VQUIT, 0);
+    newAttributes.setControlChar(Attributes.ControlChar.VMIN, 1);
+    newAttributes.setControlChar(Attributes.ControlChar.VTIME, 0);
+    // Enables signals and SIGTTOU signal to the process group of a background
+    // process which tries to write to our terminal
+    newAttributes.setLocalFlags(
+            EnumSet.of(Attributes.LocalFlag.ISIG, Attributes.LocalFlag.TOSTOP), true);
+    // No canonical mode, no echo, and no implementation-defined input processing
+    newAttributes.setLocalFlags(EnumSet.of(
+            Attributes.LocalFlag.ICANON, Attributes.LocalFlag.ECHO,
+            Attributes.LocalFlag.IEXTEN), false);
+    // Input flags
+    newAttributes.setInputFlags(EnumSet.of(
+            Attributes.InputFlag.ICRNL, Attributes.InputFlag.INLCR, Attributes.InputFlag.IXON), false);
+    terminal.setAttributes(newAttributes);
+
+    // Capabilities
+    // tput smcup; use alternate screen
+    terminal.puts(InfoCmp.Capability.enter_ca_mode);
+    terminal.puts(InfoCmp.Capability.cursor_invisible);
+    terminal.puts(InfoCmp.Capability.cursor_home);
+
+    terminal.flush();
+
+    return prevStatus;
+  }
+
+  private void restoreTerminal(TerminalStatus status) {
+    // Signal handlers
+    terminal.handle(Terminal.Signal.INT, status.handler_INT);
+    terminal.handle(Terminal.Signal.QUIT, status.handler_QUIT);
+    terminal.handle(Terminal.Signal.TSTP, status.handler_TSTP);
+    terminal.handle(Terminal.Signal.CONT, status.handler_CONT);
+    terminal.handle(Terminal.Signal.WINCH, status.handler_WINCH);
+
+    // Attributes
+    terminal.setAttributes(status.attributes);
+
+    // Capability
+    terminal.puts(InfoCmp.Capability.exit_ca_mode);
+    terminal.puts(InfoCmp.Capability.cursor_visible);
+  }
+
+  private void handleSignal(Terminal.Signal signal) {
+    switch (signal) {
+      case INT:
+      case QUIT:
+        keepRunning = false;
+        break;
+      case TSTP:
+        paused = true;
+        break;
+      case CONT:
+        paused = false;
+        break;
+      case WINCH:
+        updateTerminalSize();
+        break;
+    }
+  }
+
+  private void updateTerminalSize() {
+    terminal.flush();
+    height = terminal.getHeight();
+  }
+
+  private KeyMap<Action> bindActionKey() {
+    KeyMap<Action> keyMap = new KeyMap<>();
+    keyMap.bind(Action.QUIT, "Q", "q", ctrl('c'));
+    keyMap.bind(Action.SPACE, " ");
+
+    return keyMap;
+  }
+
+  public enum Action {
+    QUIT,
+    SPACE
+  }
+
+  private static class TerminalStatus {
+    Terminal.SignalHandler handler_INT;
+    Terminal.SignalHandler handler_QUIT;
+    Terminal.SignalHandler handler_TSTP;
+    Terminal.SignalHandler handler_CONT;
+    Terminal.SignalHandler handler_WINCH;
+
+    Attributes attributes;
+  }
+
+  private class InputThread extends Thread {
+    public InputThread() {
+    }
+
+    public void run() {
+      KeyMap<Action> keyMap = bindActionKey();
+
+      Action action = keyReader.readBinding(keyMap, null, true);
+      while (action != null && keepRunning) {
+        switch (action) {
+          case QUIT:
+            keepRunning = false;
+            return;
+          case SPACE:
+            paused = !paused;
+            break;
+        }
+        action = keyReader.readBinding(keyMap, null, true);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
new file mode 100644
index 0000000..42dd285
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.apache.samza.sql.client.interfaces.SqlSchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Schema converter which converts Avro schema to Samza Sql schema
+ */
+public class AvroSqlSchemaConverter {
+  private static final Logger LOG = LoggerFactory.getLogger(AvroSqlSchemaConverter.class);
+
+  public static SqlSchema convertAvroToSamzaSqlSchema(String schema) {
+    Schema avroSchema = Schema.parse(schema);
+    return getSchema(avroSchema.getFields());
+  }
+
+  private static SqlSchema getSchema(List<Schema.Field> fields) {
+    SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
+    for (Schema.Field field : fields) {
+      schemaBuilder.addField(field.name(), getColumnTypeName(getFieldType(field.schema())));
+    }
+    return schemaBuilder.toSchema();
+  }
+
+  private static String getColumnTypeName(SamzaSqlFieldType fieldType) {
+    if (fieldType.isPrimitiveField()) {
+      return fieldType.getTypeName().toString();
+    } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.MAP) {
+      return String.format("MAP(%s)", getColumnTypeName(fieldType.getValueType()));
+    } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.ARRAY) {
+      return String.format("ARRAY(%s)", getColumnTypeName(fieldType.getElementType()));
+    } else {
+      SqlSchema schema = fieldType.getRowSchema();
+      List<String> fieldTypes = IntStream.range(0, schema.getFieldCount())
+          .mapToObj(i -> schema.getFieldName(i) + " " + schema.getFieldTypeName(i))
+          .collect(Collectors.toList());
+      String rowSchemaValue = Joiner.on(", ").join(fieldTypes);
+      return String.format("STRUCT(%s)", rowSchemaValue);
+    }
+  }
+
+  private static SamzaSqlFieldType getFieldType(org.apache.avro.Schema schema) {
+    switch (schema.getType()) {
+      case ARRAY:
+        return SamzaSqlFieldType.createArrayFieldType(getFieldType(schema.getElementType()));
+      case BOOLEAN:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN);
+      case DOUBLE:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.DOUBLE);
+      case FLOAT:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.FLOAT);
+      case ENUM:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+      case UNION:
+        // NOTE: We only support Union types when they are used for representing Nullable fields in Avro
+        List<org.apache.avro.Schema> types = schema.getTypes();
+        if (types.size() == 2) {
+          if (types.get(0).getType() == org.apache.avro.Schema.Type.NULL) {
+            return getFieldType(types.get(1));
+          } else if ((types.get(1).getType() == org.apache.avro.Schema.Type.NULL)) {
+            return getFieldType(types.get(0));
+          }
+        }
+      case FIXED:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+      case STRING:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+      case BYTES:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BYTES);
+      case INT:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT32);
+      case LONG:
+        return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT64);
+      case RECORD:
+        return SamzaSqlFieldType.createRowFieldType(getSchema(schema.getFields()));
+      case MAP:
+        return SamzaSqlFieldType.createMapFieldType(getFieldType(schema.getValueType()));
+      default:
+        String msg = String.format("Field Type %s is not supported", schema.getType());
+        LOG.error(msg);
+        throw new SamzaException(msg);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
new file mode 100644
index 0000000..49e051a
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * System factory of Samza Sql Shell which needs to provide Consumer, Producer and Admin
+ */
+public class CliLoggingSystemFactory implements SystemFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CliLoggingSystemFactory.class);
+  private static AtomicInteger messageCounter = new AtomicInteger(0);
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return new CliLoggingSystemFactory.LoggingSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new CliLoggingSystemFactory.SimpleSystemAdmin(config);
+  }
+
+  private static class SimpleSystemAdmin implements SystemAdmin {
+
+    public SimpleSystemAdmin(Config config) {
+    }
+
+    @Override
+    public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+      return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
+    }
+
+    @Override
+    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+      return streamNames.stream()
+              .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
+                      Collections.singletonMap(new Partition(0),
+                              new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+    }
+
+    @Override
+    public Integer offsetComparator(String offset1, String offset2) {
+      if (offset1 == null) {
+        return offset2 == null ? 0 : -1;
+      } else if (offset2 == null) {
+        return 1;
+      }
+      return offset1.compareTo(offset2);
+    }
+  }
+
+  private class LoggingSystemProducer implements SystemProducer {
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void register(String source) {
+      LOG.info("Registering source" + source);
+    }
+
+    @Override
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      LOG.info(String.format(String.format("Message %d :", messageCounter.incrementAndGet())));
+      String msg = String.format("OutputStream:%s Key:%s Value:%s", envelope.getSystemStream(), envelope.getKey(),
+              new String((byte[]) envelope.getMessage()));
+      LOG.info(msg);
+
+      SamzaExecutor.saveOutputMessage(envelope);
+    }
+
+    @Override
+    public void flush(String source) {
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..8d0b12f
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.AvroTypeFactoryImpl;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Relational schemas provider which reads system schema from a given directory
+ */
+public class FileSystemAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+  public static final String CFG_SCHEMA_DIR = "schemaDir";
+
+  @Override
+  public RelSchemaProvider create(SystemStream systemStream, Config config) {
+    return new FileSystemAvroRelSchemaProvider(systemStream, config);
+  }
+
+  private class FileSystemAvroRelSchemaProvider implements AvroRelSchemaProvider {
+    private final SystemStream systemStream;
+    private final String schemaDir;
+
+    public FileSystemAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+      this.systemStream = systemStream;
+      this.schemaDir = config.get(CFG_SCHEMA_DIR);
+    }
+
+    @Override
+    public RelDataType getRelationalSchema() {
+      String schemaStr = this.getSchema(this.systemStream);
+      Schema schema = Schema.parse(schemaStr);
+      AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+      return avroTypeFactory.createType(schema);
+    }
+
+    @Override
+    public String getSchema(SystemStream systemStream) {
+      String fileName = String.format("%s.avsc", systemStream.getStream());
+      File file = new File(schemaDir, fileName);
+      try {
+        return Schema.parse(file).toString();
+      } catch (IOException e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+}