You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/22 16:51:51 UTC
[1/2] samza git commit: SAMZA-1901: Implementation of Samza SQL Shell
Repository: samza
Updated Branches:
refs/heads/master fd6f1ed25 -> e203db2aa
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
new file mode 100755
index 0000000..7c2ca32
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.*;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.RandomAccessQueue;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
+import org.apache.samza.sql.fn.FlattenUdf;
+import org.apache.samza.sql.fn.RegexMatchUdf;
+import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
+import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.kafka.KafkaSystemFactory;
+import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
+import org.apache.samza.tools.avro.AvroSerDeFactory;
+import org.apache.samza.tools.json.JsonRelConverterFactory;
+import org.apache.samza.tools.schemas.ProfileChangeEvent;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+
+/**
+ * Samza implementation of Executor for Samza SQL Shell.
+ */
+public class SamzaExecutor implements SqlExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutor.class);
+
+ private static final String SAMZA_SYSTEM_LOG = "log";
+ private static final String SAMZA_SYSTEM_KAFKA = "kafka";
+ private static final String SAMZA_SQL_OUTPUT = "samza.sql.output";
+ private static final String SAMZA_SQL_SYSTEM_KAFKA_ADDRESS = "samza.sql.system.kafka.address";
+ private static final String DEFAULT_SERVER_ADDRESS = "localhost:2181";
+
+ // The maximum number of rows of data we keep when user pauses the display view and data accumulates.
+ private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
+ private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
+
+ private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
+ new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
+ private static AtomicInteger execIdSeq = new AtomicInteger(0);
+ private Map<Integer, SamzaSqlApplicationRunner> executions = new HashMap<>();
+ private String lastErrorMsg = "";
+
+ // -- implementation of SqlExecutor ------------------------------------------
+
+ @Override
+ public void start(ExecutionContext context) {
+ }
+
+ @Override
+ public void stop(ExecutionContext context) {
+ Iterator<Integer> iter = executions.keySet().iterator();
+ while (iter.hasNext()) {
+ stopExecution(context, iter.next());
+ iter.remove();
+ }
+ outputData.clear();
+ }
+
+ @Override
+ public List<String> listTables(ExecutionContext context) {
+ /**
+ * TODO: currently Shell can only talk to Kafka system, but we should use a general way
+ * to connect to different systems.
+ */
+ lastErrorMsg = "";
+ String address = context.getConfigMap().getOrDefault(SAMZA_SQL_SYSTEM_KAFKA_ADDRESS, DEFAULT_SERVER_ADDRESS);
+ List<String> tables = null;
+ try {
+ ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
+ new ZkConnection(address), false);
+ tables = JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
+ .stream()
+ .map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
+ .collect(Collectors.toList());
+ } catch (ZkTimeoutException ex) {
+ lastErrorMsg = ex.toString();
+ LOG.error(lastErrorMsg);
+ }
+ return tables;
+ }
+
+ @Override
+ public SqlSchema getTableSchema(ExecutionContext context, String tableName) {
+ /**
+ * currently Shell works only for systems that has Avro schemas
+ */
+ lastErrorMsg = "";
+ int execId = execIdSeq.incrementAndGet();
+ Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+ Config samzaSqlConfig = new MapConfig(staticConfigs);
+ SqlSchema sqlSchema = null;
+ try {
+ SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(samzaSqlConfig);
+ SqlIOConfig sourceInfo = ioResolver.fetchSourceInfo(tableName);
+ RelSchemaProvider schemaProvider =
+ SamzaSqlApplicationConfig.initializePlugin("RelSchemaProvider", sourceInfo.getRelSchemaProviderName(),
+ samzaSqlConfig, SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
+ (o, c) -> ((RelSchemaProviderFactory) o).create(sourceInfo.getSystemStream(), c));
+ AvroRelSchemaProvider avroSchemaProvider = (AvroRelSchemaProvider) schemaProvider;
+ String schema = avroSchemaProvider.getSchema(sourceInfo.getSystemStream());
+ sqlSchema = AvroSqlSchemaConverter.convertAvroToSamzaSqlSchema(schema);
+ } catch (SamzaException ex) {
+ lastErrorMsg = ex.toString();
+ LOG.error(lastErrorMsg);
+ }
+ return sqlSchema;
+ }
+
+ @Override
+ public QueryResult executeQuery(ExecutionContext context, String statement) {
+ lastErrorMsg = "";
+ outputData.clear();
+
+ int execId = execIdSeq.incrementAndGet();
+ Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+ List<String> sqlStmts = formatSqlStmts(Collections.singletonList(statement));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+
+ SamzaSqlApplicationRunner runner;
+ try {
+ runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.run();
+ } catch (SamzaException ex) {
+ lastErrorMsg = ex.toString();
+ LOG.error(lastErrorMsg);
+ return new QueryResult(execId, null, false);
+ }
+ executions.put(execId, runner);
+ LOG.debug("Executing sql. Id ", execId);
+
+ return new QueryResult(execId, generateResultSchema(new MapConfig(staticConfigs)), true);
+ }
+
+ @Override
+ public int getRowCount() {
+ return outputData.getSize();
+ }
+
+ @Override
+ public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow) {
+ List<String[]> results = new ArrayList<>();
+ for (OutgoingMessageEnvelope row : outputData.get(startRow, endRow)) {
+ results.add(getFormattedRow(context, row));
+ }
+ return results;
+ }
+
+ @Override
+ public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow) {
+ List<String[]> results = new ArrayList<>();
+ for (OutgoingMessageEnvelope row : outputData.consume(startRow, endRow)) {
+ results.add(getFormattedRow(context, row));
+ }
+ return results;
+ }
+
+ @Override
+ public NonQueryResult executeNonQuery(ExecutionContext context, File sqlFile) {
+ lastErrorMsg = "";
+
+ LOG.info("Sql file path: " + sqlFile.getPath());
+ List<String> executedStmts = new ArrayList<>();
+ try {
+ executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList());
+ } catch (IOException e) {
+ lastErrorMsg = String.format("Unable to parse the sql file %s. %s", sqlFile.getPath(), e.toString());
+ LOG.error(lastErrorMsg);
+ return new NonQueryResult(-1, false);
+ }
+ LOG.info("Sql statements in Sql file: " + executedStmts.toString());
+
+ List<String> submittedStmts = new ArrayList<>();
+ List<String> nonSubmittedStmts = new ArrayList<>();
+ validateExecutedStmts(executedStmts, submittedStmts, nonSubmittedStmts);
+ if (submittedStmts.isEmpty()) {
+ lastErrorMsg = "Nothing to execute. Note: SELECT statements are ignored.";
+ LOG.warn("Nothing to execute. Statements in the Sql file: {}", nonSubmittedStmts);
+ return new NonQueryResult(-1, false);
+ }
+ NonQueryResult result = executeNonQuery(context, submittedStmts);
+ return new NonQueryResult(result.getExecutionId(), result.succeeded(), submittedStmts, nonSubmittedStmts);
+ }
+
+ @Override
+ public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statement) {
+ lastErrorMsg = "";
+
+ int execId = execIdSeq.incrementAndGet();
+ Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId, context);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(formatSqlStmts(statement)));
+
+ SamzaSqlApplicationRunner runner;
+ try {
+ runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.run();
+ } catch (SamzaException ex) {
+ lastErrorMsg = ex.toString();
+ LOG.error(lastErrorMsg);
+ return new NonQueryResult(execId, false);
+ }
+ executions.put(execId, runner);
+ LOG.debug("Executing sql. Id ", execId);
+
+ return new NonQueryResult(execId, true);
+ }
+
+ @Override
+ public boolean stopExecution(ExecutionContext context, int exeId) {
+ lastErrorMsg = "";
+
+ SamzaSqlApplicationRunner runner = executions.get(exeId);
+ if (runner != null) {
+ LOG.debug("Stopping execution ", exeId);
+
+ try {
+ runner.kill();
+ } catch (SamzaException ex) {
+ lastErrorMsg = ex.toString();
+ LOG.debug(lastErrorMsg);
+ return false;
+ }
+
+ try {
+ Thread.sleep(500); // wait for a second
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return true;
+ } else {
+ lastErrorMsg = "Trying to stop a non-existing SQL execution " + exeId;
+ LOG.warn(lastErrorMsg);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean removeExecution(ExecutionContext context, int exeId) {
+ lastErrorMsg = "";
+
+ SamzaSqlApplicationRunner runner = executions.get(exeId);
+ if (runner != null) {
+ if (runner.status().getStatusCode().equals(ApplicationStatus.StatusCode.Running)) {
+ lastErrorMsg = "Trying to remove a ongoing execution " + exeId;
+ LOG.error(lastErrorMsg);
+ return false;
+ }
+ executions.remove(exeId);
+ LOG.debug("Stopping execution ", exeId);
+ return true;
+ } else {
+ lastErrorMsg = "Trying to remove a non-existing SQL execution " + exeId;
+ LOG.warn(lastErrorMsg);
+ return false;
+ }
+ }
+
+ @Override
+ public ExecutionStatus queryExecutionStatus(int execId) {
+ SamzaSqlApplicationRunner runner = executions.get(execId);
+ if (runner == null) {
+ return null;
+ }
+ return queryExecutionStatus(runner);
+ }
+
+ @Override
+ public String getErrorMsg() {
+ return lastErrorMsg;
+ }
+
+ @Override
+ public List<SqlFunction> listFunctions(ExecutionContext context) {
+ /**
+ * TODO: currently the Shell only shows some UDFs supported by Samza internally. We may need to require UDFs
+ * to provide a function of getting their "SamzaSqlUdfDisplayInfo", then we can get the UDF information from
+ * SamzaSqlApplicationConfig.udfResolver(or SamzaSqlApplicationConfig.udfMetadata) instead of registering
+ * UDFs one by one as below.
+ */
+ List<SqlFunction> udfs = new ArrayList<>();
+ udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex",
+ Arrays.asList(SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING),
+ SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING)),
+ SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN)));
+
+ return udfs;
+ }
+
+ static void saveOutputMessage(OutgoingMessageEnvelope messageEnvelope) {
+ outputData.add(messageEnvelope);
+ }
+
+ static Map<String, String> fetchSamzaSqlConfig(int execId, ExecutionContext executionContext) {
+ HashMap<String, String> staticConfigs = new HashMap<>();
+
+ staticConfigs.put(JobConfig.JOB_NAME(), "sql-job-" + execId);
+ staticConfigs.put(JobConfig.PROCESSOR_ID(), String.valueOf(execId));
+ staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+ staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
+ String configIOResolverDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
+ staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ ConfigBasedIOResolverFactory.class.getName());
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
+ String configUdfResolverDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_UDF_RESOLVER_DOMAIN, "config");
+ staticConfigs.put(configUdfResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ ConfigBasedUdfResolver.class.getName());
+ staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES,
+ Joiner.on(",").join(RegexMatchUdf.class.getName(), FlattenUdf.class.getName()));
+
+ staticConfigs.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
+ staticConfigs.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName());
+ staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, ProfileChangeEvent.SCHEMA$.toString());
+
+ String kafkaSystemConfigPrefix =
+ String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_KAFKA);
+ String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_KAFKA);
+ staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", KafkaSystemFactory.class.getName());
+ staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string");
+ staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro");
+ staticConfigs.put(kafkaSystemConfigPrefix + "consumer.zookeeper.connect", "localhost:2181");
+ staticConfigs.put(kafkaSystemConfigPrefix + "producer.bootstrap.servers", "localhost:9092");
+
+ staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
+ staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest");
+
+ staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String logSystemConfigPrefix =
+ String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG);
+ String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG);
+ staticConfigs.put(logSystemConfigPrefix + "samza.factory", CliLoggingSystemFactory.class.getName());
+ staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json");
+ staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String avroSamzaToRelMsgConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
+
+ staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ AvroSchemaGenRelConverterFactory.class.getName());
+
+ String jsonSamzaToRelMsgConverterDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "json");
+
+ staticConfigs.put(jsonSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ JsonRelConverterFactory.class.getName());
+
+ String configAvroRelSchemaProviderDomain =
+ String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
+ staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
+ FileSystemAvroRelSchemaProviderFactory.class.getName());
+
+ staticConfigs.put(
+ configAvroRelSchemaProviderDomain + FileSystemAvroRelSchemaProviderFactory.CFG_SCHEMA_DIR,
+ "/tmp/schemas/");
+
+ /* TODO: we need to validate and read configurations from shell-defaults.conf (aka. "executionContext"),
+ * and update their value if they've been included in staticConfigs. We could handle these logic
+ * Shell level, or in Executor level.
+ */
+ staticConfigs.putAll(executionContext.getConfigMap());
+
+ return staticConfigs;
+ }
+
+ private List<String> formatSqlStmts(List<String> statements) {
+ return statements.stream().map(sql -> {
+ if (!sql.toLowerCase().startsWith("insert")) {
+ String formattedSql = String.format("insert into log.outputStream %s", sql);
+ LOG.debug("Sql formatted. ", sql, formattedSql);
+ return formattedSql;
+ } else {
+ return sql;
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private void validateExecutedStmts(List<String> statements, List<String> submittedStmts,
+ List<String> nonSubmittedStmts) {
+ for (String sql : statements) {
+ if (sql.isEmpty()) {
+ continue;
+ }
+ if (!sql.toLowerCase().startsWith("insert")) {
+ nonSubmittedStmts.add(sql);
+ } else {
+ submittedStmts.add(sql);
+ }
+ }
+ }
+
+ SqlSchema generateResultSchema(Config config) {
+ SamzaSqlDslConverter converter = (SamzaSqlDslConverter) new SamzaSqlDslConverterFactory().create(config);
+ RelRoot relRoot = converter.convertDsl("").iterator().next();
+
+ List<String> colNames = new ArrayList<>();
+ List<String> colTypeNames = new ArrayList<>();
+ for (RelDataTypeField dataTypeField : relRoot.validatedRowType.getFieldList()) {
+ colNames.add(dataTypeField.getName());
+ colTypeNames.add(dataTypeField.getType().toString());
+ }
+
+ return new SqlSchema(colNames, colTypeNames);
+ }
+
+ private String[] getFormattedRow(ExecutionContext context, OutgoingMessageEnvelope row) {
+ String[] formattedRow = new String[1];
+ String outputFormat = context.getConfigMap().get(SAMZA_SQL_OUTPUT);
+ if (outputFormat == null || !outputFormat.equalsIgnoreCase(MessageFormat.PRETTY.toString())) {
+ formattedRow[0] = getCompressedFormat(row);
+ } else {
+ formattedRow[0] = getPrettyFormat(row);
+ }
+ return formattedRow;
+ }
+
+ private ExecutionStatus queryExecutionStatus(SamzaSqlApplicationRunner runner) {
+ lastErrorMsg = "";
+ switch (runner.status().getStatusCode()) {
+ case New:
+ return ExecutionStatus.New;
+ case Running:
+ return ExecutionStatus.Running;
+ case SuccessfulFinish:
+ return ExecutionStatus.SuccessfulFinish;
+ case UnsuccessfulFinish:
+ return ExecutionStatus.UnsuccessfulFinish;
+ default:
+ lastErrorMsg = String.format("Unsupported execution status %s",
+ runner.status().getStatusCode().toString());
+ return null;
+ }
+ }
+
+ private String getPrettyFormat(OutgoingMessageEnvelope envelope) {
+ String value = new String((byte[]) envelope.getMessage());
+ ObjectMapper mapper = new ObjectMapper();
+ String formattedValue;
+ try {
+ Object json = mapper.readValue(value, Object.class);
+ formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+ } catch (IOException e) {
+ formattedValue = value;
+ LOG.error("Error while formatting json", e);
+ }
+ return formattedValue;
+ }
+
+ private String getCompressedFormat(OutgoingMessageEnvelope envelope) {
+ return new String((byte[]) envelope.getMessage());
+ }
+
+ private enum MessageFormat {
+ PRETTY,
+ COMPACT
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
new file mode 100644
index 0000000..ed11a53
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlFieldType.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+
+/**
+ * Types of Samza Sql fields.
+ */
+public class SamzaSqlFieldType {
+
+ private TypeName typeName;
+ private SamzaSqlFieldType elementType;
+ private SamzaSqlFieldType valueType;
+ private SqlSchema rowSchema;
+
+ private SamzaSqlFieldType(TypeName typeName, SamzaSqlFieldType elementType, SamzaSqlFieldType valueType, SqlSchema rowSchema) {
+ this.typeName = typeName;
+ this.elementType = elementType;
+ this.valueType = valueType;
+ this.rowSchema = rowSchema;
+ }
+
+ public static SamzaSqlFieldType createPrimitiveFieldType(TypeName typeName) {
+ return new SamzaSqlFieldType(typeName, null, null, null);
+ }
+
+ public static SamzaSqlFieldType createArrayFieldType(SamzaSqlFieldType elementType) {
+ return new SamzaSqlFieldType(TypeName.ARRAY, elementType, null, null);
+ }
+
+ public static SamzaSqlFieldType createMapFieldType(SamzaSqlFieldType valueType) {
+ return new SamzaSqlFieldType(TypeName.MAP, null, valueType, null);
+ }
+
+ public static SamzaSqlFieldType createRowFieldType(SqlSchema rowSchema) {
+ return new SamzaSqlFieldType(TypeName.ROW, null, null, rowSchema);
+ }
+
+ public boolean isPrimitiveField() {
+ return typeName != TypeName.ARRAY && typeName != TypeName.MAP && typeName != TypeName.ROW;
+ }
+
+ public TypeName getTypeName() {
+ return typeName;
+ }
+
+ public SamzaSqlFieldType getElementType() {
+ return elementType;
+ }
+
+ public SamzaSqlFieldType getValueType() {
+ return valueType;
+ }
+
+ public SqlSchema getRowSchema() {
+ return rowSchema;
+ }
+
+ public enum TypeName {
+ BYTE, // One-byte signed integer.
+ INT16, // two-byte signed integer.
+ INT32, // four-byte signed integer.
+ INT64, // eight-byte signed integer.
+ DECIMAL, // Decimal integer
+ FLOAT,
+ DOUBLE,
+ STRING, // String.
+ DATETIME, // Date and time.
+ BOOLEAN, // Boolean.
+ BYTES, // Byte array.
+ ARRAY,
+ MAP,
+ ROW, // The field is itself a nested row.
+ ANY
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
new file mode 100644
index 0000000..2e89978
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaSqlUdfDisplayInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import org.apache.samza.sql.client.interfaces.SqlFunction;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * UDF information displayer
+ */
+public class SamzaSqlUdfDisplayInfo implements SqlFunction {
+
+ private String name;
+
+ private String description;
+
+ private List<SamzaSqlFieldType> argumentTypes;
+
+ private SamzaSqlFieldType returnType;
+
+ public SamzaSqlUdfDisplayInfo(String name, String description, List<SamzaSqlFieldType> argumentTypes,
+ SamzaSqlFieldType returnType) {
+ this.name = name;
+ this.description = description;
+ this.argumentTypes = argumentTypes;
+ this.returnType = returnType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public List<String> getArgumentTypes() {
+ return argumentTypes.stream().map(x -> x.getTypeName().toString()).collect(Collectors.toList());
+ }
+
+ public String getReturnType() {
+ return returnType.getTypeName().toString();
+ }
+
+ public String toString() {
+ List<String> argumentTypeNames =
+ argumentTypes.stream().map(x -> x.getTypeName().toString()).collect(Collectors.toList());
+ String args = Joiner.on(", ").join(argumentTypeNames);
+ return String.format("%s(%s) returns <%s> : %s", name, args, returnType.getTypeName().toString(), description);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
new file mode 100644
index 0000000..14dfa64
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.Map;
+
+/**
+ * Whenever the shell calls the executor to execute a SQL statement, an object of ExecutionContext is passed.
+ */
+public class ExecutionContext {
+ private Map<String, String> m_configs;
+
+ public ExecutionContext(Map<String, String> config) {
+ m_configs = config;
+ }
+
+ /**
+ * @return The Map storing all configuration pairs. Note that the set map is the same as the one used by
+ * ExecutionContext, so changes to the map are reflected in ExecutionContext, and vice-versa.
+ */
+ public Map<String, String> getConfigMap() {
+ return m_configs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
new file mode 100644
index 0000000..5991922
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * An executor shall throw an ExecutionException when it encounters an unrecoverable error.
+ */
+public class ExecutionException extends RuntimeException {
+ public ExecutionException() {
+ }
+
+ public ExecutionException(String message) {
+ super(message);
+ }
+
+ public ExecutionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ExecutionException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
new file mode 100644
index 0000000..77ee888
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/ExecutionStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * Status of the execution of a SQL statement.
+ */
+public enum ExecutionStatus {
+ New,
+ Running,
+ SuccessfulFinish,
+ UnsuccessfulFinish
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
new file mode 100644
index 0000000..0173ad1
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/NonQueryResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.List;
+
+/**
+ * Result of a non-query SQL statement or SQL file containing multiple non-query statements.
+ */
+public class NonQueryResult {
+ private int execId; // execution ID of the statement(s) submitted
+ private boolean success; // whether the statement(s) submitted successfully
+
+ // When user submits a batch of SQL statements, only the non-query ones will be submitted
+ private List<String> submittedStmts;
+ private List<String> nonSubmittedStmts;
+
+ public NonQueryResult(int execId, boolean success) {
+ this.execId = execId;
+ this.success = success;
+ }
+
+ public NonQueryResult(int execId, boolean success, List<String> submittedStmts, List<String> nonSubmittedStmts) {
+ this.execId = execId;
+ this.success = success;
+ this.submittedStmts = submittedStmts;
+ this.nonSubmittedStmts = nonSubmittedStmts;
+ }
+
+ public int getExecutionId() {
+ return execId;
+ }
+
+ public boolean succeeded() {
+ return success;
+ }
+
+ public List<String> getSubmittedStmts() {
+ return submittedStmts;
+ }
+
+ public List<String> getNonSubmittedStmts() {
+ return nonSubmittedStmts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
new file mode 100644
index 0000000..6f54557
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/QueryResult.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+/**
+ * Execution result of a SELECT statement. It doesn't contain data though.
+ */
+public class QueryResult {
+ private int execId; // execution ID of the statement(s) submitted
+ private boolean success; // whether the statement(s) submitted successfully
+ private SqlSchema schema; // The schema of the data coming from the query
+
+ public QueryResult(int execId, SqlSchema schema, Boolean success) {
+ if (success && schema == null)
+ throw new IllegalArgumentException();
+ this.execId = execId;
+ this.schema = schema;
+ this.success = success;
+ }
+
+ public int getExecutionId() {
+ return execId;
+ }
+
+ public SqlSchema getSchema() {
+ return schema;
+ }
+
+ public boolean succeeded() {
+ return success;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
new file mode 100644
index 0000000..9d528f6
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlExecutor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+
+import java.io.File;
+import java.util.List;
+
+
+/**
+ * Conventions:
+ * <p>
+ * Implementations shall report UNRECOVERABLE EXCEPTIONS by throwing
+ * ExecutionExceptions, though SqlExecutor doesn't enforce this by as we don't believe in
+ * Java checked exceptions. Report errors by returning values as indicated by each
+ * function and preparing for the subsequent getErrorMsg call.
+ * <p>
+ * Each execution (both query and non-query shall return an non-negative execution ID(execId).
+ * Negative execution IDs are reserved for error handling.
+ * <p>
+ * User shall be able to query the status of an execution even after it's finished, so the
+ * executor shall keep record of all the execution unless being asked to remove them (
+ * when removeExecution is called.)
+ * <p>
+ * IMPORTANT: An executor shall support two ways of supplying data:
+ * 1. Say user selects profiles of users who visited LinkedIn in the last 5 mins. There could
+ * be millions of rows, but the UI only need to show a small fraction. That's retrieveQueryResult,
+ * accepting a row range (startRow and endRow). Note that UI may ask for the same data over and over,
+ * like when user switches from page 1 to page 2 and data stream changes at the same time, the two
+ * pages may actually have overlapped or even same data.
+ * <p>
+ * 2. Say user wants to see clicks on a LinkedIn page of certain person from now on. In this mode
+ * consumeQueryResult shall be used. UI can keep asking for new rows, and once the rows are consumed,
+ * it's no longer necessary for the executor to keep them. If lots of rows come in, the UI may be only
+ * interested in the last certain rows (as it's in a logview mode), so all data older can be dropped.
+ */
+public interface SqlExecutor {
+ /**
+ * SqlExecutor shall be ready to accept all other calls after start() is called.
+ * However, it shall NOT store the ExecutionContext for future use, as each
+ * call will be given an ExecutionContext which may differ from this one.
+ *
+ * @param context The ExecutionContext at the time of the call.
+ */
+ public void start(ExecutionContext context);
+
+ /**
+ * Indicates no further calls will be made thus it's safe for the executor to clean up.
+ *
+ * @param context The ExecutionContext at the time of the call.
+ */
+ public void stop(ExecutionContext context);
+
+ /**
+ * @param context The ExecutionContext at the time of the call.
+ * @return null if an error occurs. Prepare for subsequent getErrorMsg call.
+ * an empty list indicates no tables found.
+ */
+ public List<String> listTables(ExecutionContext context);
+
+ /**
+ * @param context The ExecutionContext at the time of the call.
+ * @param tableName Name of the table to get the schema for.
+ * @return null if an error occurs. Prepare for subsequent getErrorMsg call.
+ */
+ public SqlSchema getTableSchema(ExecutionContext context, String tableName);
+
+ /**
+ * @param context The ExecutionContext at the time of the call.
+ * @param statement statement to execute
+ * @return The query result.
+ */
+ public QueryResult executeQuery(ExecutionContext context, String statement);
+
+
+ /**
+ * @return how many rows available for reading.
+ */
+ public int getRowCount();
+
+ /**
+ * Row starts at 0. Executor shall keep the data retrieved.
+ * For now we get strings for display but we might want strong typed values.
+ *
+ * @param context The ExecutionContext at the time of the call.
+ * @param startRow Start row index (inclusive)
+ * @param endRow End row index (inclusive)
+ * @return A list of row data represented by a String array.
+ */
+ public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow);
+
+
+ /**
+ * Consumes rows from query result. Executor shall drop them, as "consume" indicates.
+ * ALL data before endRow (inclusive, including data before startRow) shall be deleted.
+ *
+ * @param context The ExecutionContext at the time of the call.
+ * @param startRow Start row index (inclusive)
+ * @param endRow End row index (inclusive)
+ * @return available data between startRow and endRow (both are inclusive)
+ */
+ public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow);
+
+ /**
+ * Executes all the NON-QUERY statements in the sqlFile.
+ * Query statements are ignored as it won't make sense.
+ *
+ * @param context The ExecutionContext at the time of the call.
+ * @param file A File object to read statements from.
+ * @return Execution result.
+ */
+ public NonQueryResult executeNonQuery(ExecutionContext context, File file);
+
+ /**
+ * @param context The ExecutionContext at the time of the call.
+ * @param statements A list of non-query sql statements.
+ * @return Execution result.
+ */
+ public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statements);
+
+ /**
+ * @param context The ExecutionContext at the time of the call.
+ * @param exeId Execution ID.
+ * @return Whether the operation suceeded or not.
+ */
+ public boolean stopExecution(ExecutionContext context, int exeId);
+
+
+ /**
+ * Removing an ongoing execution shall result in an error. Stop it first.
+ *
+ * @param context The ExecutionContext at the time of the call
+ * @param exeId Execution ID.
+ * @return Whether the operation succeeded or not.
+ */
+ public boolean removeExecution(ExecutionContext context, int exeId);
+
+ /**
+ * @param execId Execution ID.
+ * @return ExecutionStatus.
+ */
+ public ExecutionStatus queryExecutionStatus(int execId);
+
+ /**
+ * @return The last error message of last function call.
+ */
+ public String getErrorMsg();
+
+ /**
+ * @param context The ExecutionContext at the time of the call.
+ * @return A list of SqlFunction.
+ */
+ List<SqlFunction> listFunctions(ExecutionContext context);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
new file mode 100644
index 0000000..3ac602c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.List;
+
+/**
+ * Represents a SQL function.
+ */
+public interface SqlFunction {
+ /**
+ * Gets the name of the function.
+ * @return name of the function
+ */
+ public String getName();
+
+ /**
+ * Gets the description of the function.
+ * @return description of the function.
+ */
+ public String getDescription();
+
+ /**
+ * Gets the argument types of the function as a List.
+ * @return A list containing the type names of the arguments.
+ */
+ public List<String> getArgumentTypes();
+
+ /**
+ * Gets the return type of the function.
+ * @return return type name
+ */
+ public String getReturnType();
+
+ /**
+ * Don't forget to implement toString()
+ */
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
new file mode 100644
index 0000000..7dcabdb
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+
+import java.util.List;
+
+/**
+ * A primitive representation of SQL schema which is just for display purpose.
+ */
+public class SqlSchema {
+ private String[] names; // field names
+ private String[] typeNames; // names of field type
+
+ public SqlSchema(List<String> colNames, List<String> colTypeNames) {
+ if (colNames == null || colNames.size() == 0
+ || colTypeNames == null || colTypeNames.size() == 0
+ || colNames.size() != colTypeNames.size())
+ throw new IllegalArgumentException();
+
+ names = new String[colNames.size()];
+ names = colNames.toArray(names);
+
+ typeNames = new String[colTypeNames.size()];
+ typeNames = colTypeNames.toArray(typeNames);
+ }
+
+ public int getFieldCount() {
+ return names.length;
+ }
+
+ public String getFieldName(int colIdx) {
+ return names[colIdx];
+ }
+
+ public String getFieldTypeName(int colIdx) {
+ return typeNames[colIdx];
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
new file mode 100644
index 0000000..84e8a0e
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/interfaces/SqlSchemaBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.interfaces;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Convenient class for building a SqlSchema object.
+ */
+public class SqlSchemaBuilder {
+ private List<String> names = new ArrayList<>();
+ private List<String> typeNames = new ArrayList<>();
+
+ private SqlSchemaBuilder() {
+ }
+
+ public static SqlSchemaBuilder builder() {
+ return new SqlSchemaBuilder();
+ }
+
+ public SqlSchemaBuilder addField(String name, String fieldType) {
+ if (name == null || name.isEmpty() || fieldType == null)
+ throw new IllegalArgumentException();
+
+ names.add(name);
+ typeNames.add(fieldType);
+ return this;
+ }
+
+ public SqlSchemaBuilder appendFields(List<String> names, List<String> typeNames) {
+ if (names == null || names.size() == 0
+ || typeNames == null || typeNames.size() == 0
+ || names.size() != typeNames.size())
+ throw new IllegalArgumentException();
+
+ this.names.addAll(names);
+ this.typeNames.addAll(typeNames);
+
+ return this;
+ }
+
+ public SqlSchema toSchema() {
+ return new SqlSchema(names, typeNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
new file mode 100755
index 0000000..743ff44
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+/**
+ * The exception used by the shell for unrecoverable errors.
+ */
+public class CliException extends RuntimeException {
+ public CliException() {
+
+ }
+
+ public CliException(String message) {
+ super(message);
+ }
+
+ public CliException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CliException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
new file mode 100755
index 0000000..03fe1b1
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/CliUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+/**
+ * Convenient utility class with static methods.
+ */
+public class CliUtil {
+ public static boolean isNullOrEmpty(String str) {
+ return str == null || str.isEmpty();
+ }
+
+ public static int ceilingDiv(int x, int y) {
+ if (x < 0 || y <= 0)
+ throw new IllegalArgumentException();
+
+ return x / y + (x % y == 0 ? 0 : 1);
+ }
+
+ public static StringBuilder appendTo(StringBuilder builder, int toPos, char c) {
+ for (int i = builder.length(); i <= toPos; ++i) {
+ builder.append(c);
+ }
+ return builder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
new file mode 100644
index 0000000..789d64c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/util/RandomAccessQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A queue that supports random access and consumption.
+ * @param <T> Element type
+ */
+public class RandomAccessQueue<T> {
+ private T[] buffer;
+ private int capacity;
+ private int size;
+ private int head;
+
+ public RandomAccessQueue(Class<T> t, int capacity) {
+ this.capacity = capacity;
+ head = 0;
+ size = 0;
+
+ @SuppressWarnings("unchecked") final T[] b = (T[]) Array.newInstance(t, capacity);
+ buffer = b;
+ }
+
+ public synchronized List<T> get(int start, int end) {
+ int lowerBound = Math.max(start, 0);
+ int upperBound = Math.min(end, size - 1);
+ List<T> rets = new ArrayList<>();
+ for (int i = lowerBound; i <= upperBound; i++) {
+ rets.add(buffer[(head + i) % capacity]);
+ }
+ return rets;
+ }
+
+ public synchronized T get(int index) {
+ if (index >= 0 && index < size) {
+ return buffer[(head + index) % capacity];
+ }
+ throw new CliException("OutOfBoundaryError");
+ }
+
+ public synchronized void add(T t) {
+ if (size >= capacity) {
+ buffer[head] = t;
+ head = (head + 1) % capacity;
+ } else {
+ int pos = (head + size) % capacity;
+ buffer[pos] = t;
+ size++;
+ }
+ }
+
+ /*
+ * Remove all element before 'end', and return elements between 'start' and 'end'
+ */
+ public synchronized List<T> consume(int start, int end) {
+ List<T> rets = get(start, end);
+ int upperBound = Math.min(end, size - 1);
+ head = (end + 1) % capacity;
+ size -= (upperBound + 1);
+ return rets;
+ }
+
+ public synchronized int getHead() {
+ return head;
+ }
+
+ public synchronized int getSize() {
+ return size;
+ }
+
+ public synchronized void clear() {
+ head = 0;
+ size = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
new file mode 100644
index 0000000..18fe4b7
--- /dev/null
+++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.sql.client.impl.SamzaExecutor.*;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
+
+
+public class SamzaExecutorTest {
+ private SamzaExecutor m_executor = new SamzaExecutor();
+
+ @Test
+ public void testGetTableSchema() {
+ ExecutionContext context = getExecutionContext();
+ SqlSchema ts = m_executor.getTableSchema(context, "kafka.ProfileChangeStream");
+
+ Assert.assertEquals("Name", ts.getFieldName(0));
+ Assert.assertEquals("NewCompany", ts.getFieldName(1));
+ Assert.assertEquals("OldCompany", ts.getFieldName(2));
+ Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(3));
+ Assert.assertEquals("STRING", ts.getFieldTypeName(0));
+ Assert.assertEquals("STRING", ts.getFieldTypeName(1));
+ Assert.assertEquals("STRING", ts.getFieldTypeName(2));
+ Assert.assertEquals("INT64", ts.getFieldTypeName(3));
+ }
+
+ @Test
+ public void testGenerateResultSchema() {
+ ExecutionContext context = getExecutionContext();
+ Map<String, String> mapConf = fetchSamzaSqlConfig(1, context);
+ SqlSchema ts = m_executor.generateResultSchema(new MapConfig(mapConf));
+
+ Assert.assertEquals("__key__", ts.getFieldName(0));
+ Assert.assertEquals("Name", ts.getFieldName(1));
+ Assert.assertEquals("NewCompany", ts.getFieldName(2));
+ Assert.assertEquals("OldCompany", ts.getFieldName(3));
+ Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4));
+ Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0));
+ Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1));
+ Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2));
+ Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3));
+ Assert.assertEquals("BIGINT", ts.getFieldTypeName(4));
+ }
+
+ private ExecutionContext getExecutionContext() {
+ ClassLoader classLoader = getClass().getClassLoader();
+ File file = new File(classLoader.getResource("ProfileChangeStream.avsc").getFile());
+ Map<String, String> mapConf = new HashMap<>();
+ mapConf.put("samza.sql.relSchemaProvider.config.schemaDir", file.getParent());
+ mapConf.put(CFG_SQL_STMT, "insert into log.outputStream select * from kafka.ProfileChangeStream");
+ return new ExecutionContext(mapConf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
new file mode 100644
index 0000000..fe7a19a
--- /dev/null
+++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/util/RandomAccessQueueTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.util;
+
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class RandomAccessQueueTest {
+ private RandomAccessQueue m_queue;
+ public RandomAccessQueueTest() {
+ m_queue = new RandomAccessQueue<>(Integer.class, 5);
+ }
+
+ @Test
+ public void testAddAndGetElement() {
+ m_queue.clear();
+ for (int i = 0; i < 4; i++) {
+ m_queue.add(i);
+ }
+ Assert.assertEquals(0, m_queue.getHead());
+ Assert.assertEquals(4, m_queue.getSize());
+ Assert.assertEquals(0, m_queue.get(0));
+ Assert.assertEquals(3, m_queue.get(3));
+
+ for (int i = 0; i < 3; i++) {
+ m_queue.add(4 + i);
+ }
+ int head = m_queue.getHead();
+ Assert.assertEquals(2, head);
+ Assert.assertEquals(5, m_queue.getSize());
+ Assert.assertEquals(2, m_queue.get(0));
+ Assert.assertEquals(3, m_queue.get(1));
+ Assert.assertEquals(4, m_queue.get(2));
+ Assert.assertEquals(5, m_queue.get(3));
+ Assert.assertEquals(6, m_queue.get(4));
+ }
+
+ @Test
+ public void testGetRange() {
+ m_queue.clear();
+ for (int i = 0; i < 4; i++) {
+ m_queue.add(i); // 0, 1, 2, 3
+ }
+ List<Integer> rets = m_queue.get(-1, 9);
+ Assert.assertEquals(4, rets.size());
+ Assert.assertEquals(0, m_queue.get(0));
+ Assert.assertEquals(3, m_queue.get(3));
+
+ for (int i = 0; i <= 2; i++) {
+ m_queue.add(4 + i);
+ }
+ rets = m_queue.get(0, 4);
+ Assert.assertEquals(2, rets.get(0).intValue());
+ Assert.assertEquals(3, rets.get(1).intValue());
+ Assert.assertEquals(4, rets.get(2).intValue());
+ Assert.assertEquals(5, rets.get(3).intValue());
+ Assert.assertEquals(6, rets.get(4).intValue());
+ }
+
+ @Test
+ public void testConsume() {
+ m_queue.clear();
+ for (int i = 0; i < 4; i++) {
+ m_queue.add(i); // 0, 1, 2, 3
+ }
+ List<Integer> rets = m_queue.consume(1, 2);
+ Assert.assertEquals(1, m_queue.getSize());
+ Assert.assertEquals(3, m_queue.getHead());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
new file mode 100644
index 0000000..5c1e49d
--- /dev/null
+++ b/samza-sql-shell/src/test/resources/ProfileChangeStream.avsc
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+ "name": "ProfileChangeEvent",
+ "version" : 1,
+ "namespace": "com.linkedin.samza.tools.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "Name",
+ "doc": "Name of the profile.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "NewCompany",
+ "doc": "Name of the new company the person joined.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "OldCompany",
+ "doc": "Name of the old company the person was working.",
+ "type": ["null", "string"],
+ "default":null
+ },
+ {
+ "name": "ProfileChangeTimestamp",
+ "doc": "Time at which the profile was changed.",
+ "type": ["null", "long"],
+ "default":null
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 17df373..745c934 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -143,7 +143,7 @@ public class SamzaSqlApplicationConfig {
});
}
- private static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
+ public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
String pluginDomainFormat, BiFunction<Object, Config, T> factoryInvoker) {
String pluginDomain = String.format(pluginDomainFormat, plugin);
Config pluginConfig = staticConfig.subset(pluginDomain);
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 853a7bb..5b9df5c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -27,6 +27,7 @@ include \
'samza-rest',
'samza-shell',
'samza-sql',
+ 'samza-sql-shell',
'samza-tools'
def scalaModules = [
[2/2] samza git commit: SAMZA-1901: Implementation of Samza SQL Shell
Posted by sr...@apache.org.
SAMZA-1901: Implementation of Samza SQL Shell
## What changes were proposed in this pull request?
This PR is to implement Samza SQL shell. The document about the shell was attached [here](https://issues.apache.org/jira/browse/SAMZA-1901).
## How was this patch tested?
1. Add unit tests
2. Run the shell with use cases mentioned in the attached document under https://issues.apache.org/jira/browse/SAMZA-1901
Author: Weiqing Yang <ya...@gmail.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>, Aditya Toomula <at...@linkedin.com>
Closes #654 from weiqingy/samza-shell
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e203db2a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e203db2a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e203db2a
Branch: refs/heads/master
Commit: e203db2aa29614c15ff0ee1593bf41c79b3b5c42
Parents: fd6f1ed
Author: Weiqing Yang <ya...@gmail.com>
Authored: Mon Oct 22 09:51:40 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Mon Oct 22 09:51:40 2018 -0700
----------------------------------------------------------------------
build.gradle | 28 +
gradle/dependency-versions.gradle | 2 +
.../JobNodeConfigurationGenerator.java | 6 +-
samza-sql-shell/conf/samza-sql-shell-log4j.xml | 49 ++
samza-sql-shell/conf/shell-defaults.conf | 28 +
samza-sql-shell/scripts/samza-sql-shell.sh | 42 +
.../apache/samza/sql/client/cli/CliCommand.java | 53 ++
.../samza/sql/client/cli/CliCommandType.java | 78 ++
.../samza/sql/client/cli/CliConstants.java | 57 ++
.../samza/sql/client/cli/CliEnvironment.java | 130 +++
.../samza/sql/client/cli/CliHighlighter.java | 89 ++
.../apache/samza/sql/client/cli/CliShell.java | 821 +++++++++++++++++++
.../apache/samza/sql/client/cli/CliView.java | 29 +
.../org/apache/samza/sql/client/cli/Main.java | 117 +++
.../sql/client/cli/QueryResultLogView.java | 291 +++++++
.../sql/client/impl/AvroSqlSchemaConverter.java | 112 +++
.../client/impl/CliLoggingSystemFactory.java | 117 +++
.../FileSystemAvroRelSchemaProviderFactory.java | 75 ++
.../samza/sql/client/impl/SamzaExecutor.java | 511 ++++++++++++
.../sql/client/impl/SamzaSqlFieldType.java | 94 +++
.../sql/client/impl/SamzaSqlUdfDisplayInfo.java | 71 ++
.../sql/client/interfaces/ExecutionContext.java | 41 +
.../client/interfaces/ExecutionException.java | 40 +
.../sql/client/interfaces/ExecutionStatus.java | 30 +
.../sql/client/interfaces/NonQueryResult.java | 62 ++
.../sql/client/interfaces/QueryResult.java | 49 ++
.../sql/client/interfaces/SqlExecutor.java | 171 ++++
.../sql/client/interfaces/SqlFunction.java | 55 ++
.../samza/sql/client/interfaces/SqlSchema.java | 56 ++
.../sql/client/interfaces/SqlSchemaBuilder.java | 63 ++
.../samza/sql/client/util/CliException.java | 41 +
.../apache/samza/sql/client/util/CliUtil.java | 43 +
.../sql/client/util/RandomAccessQueue.java | 96 +++
.../sql/client/impl/SamzaExecutorTest.java | 79 ++
.../sql/client/util/RandomAccessQueueTest.java | 89 ++
.../src/test/resources/ProfileChangeStream.avsc | 51 ++
.../sql/runner/SamzaSqlApplicationConfig.java | 2 +-
settings.gradle | 1 +
38 files changed, 3764 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 856bb1d..b6ee4ac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -336,6 +336,34 @@ project(':samza-sql') {
}
}
+project(':samza-sql-shell') {
+ apply plugin: 'java'
+
+ dependencies {
+ compile project(':samza-sql')
+ compile project(':samza-tools')
+ compile project(":samza-core_$scalaVersion")
+ compile project(':samza-api')
+ compile project(":samza-kafka_$scalaVersion")
+ compile project(':samza-azure')
+ compile "net.java.dev.jna:jna:$jnaVersion"
+ compile "org.jline:jline:$jlineVersion"
+
+ testCompile "junit:junit:$junitVersion"
+ }
+
+ tasks.create(name: "releaseSqlShellTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
+ into "samza-sql-shell-${version}"
+ compression = Compression.GZIP
+ from(project.file("./scripts")) { into "scripts/" }
+ from(project.file("./conf")) { into "conf/" }
+ from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "scripts/" }
+ from(configurations.runtime) { into("lib/") }
+ from(configurations.archives.artifacts.files) { into("lib/") }
+ duplicatesStrategy 'exclude'
+ }
+}
+
project(':samza-tools') {
apply plugin: 'java'
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index a5b4f51..43ceb0a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -47,4 +47,6 @@
zkClientVersion = "0.8"
zookeeperVersion = "3.4.6"
failsafeVersion = "1.1.0"
+ jlineVersion = "3.8.2"
+ jnaVersion = "4.5.1"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 70c9b23..215177b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -70,10 +70,8 @@ import org.slf4j.LoggerFactory;
static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
originalConfig.forEach((k, v) -> {
- if (generatedConfig.containsKey(k) &&
- !Objects.equals(generatedConfig.get(k), v)) {
- LOG.info("Replacing generated config for key: {} value: {} with original config value: {}",
- k, generatedConfig.get(k), v);
+ if (generatedConfig.containsKey(k) && !Objects.equals(generatedConfig.get(k), v)) {
+ LOG.info("Replacing generated config for key: {} value: {} with original config value: {}", k, generatedConfig.get(k), v);
}
mergedConfig.put(k, v);
});
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/conf/samza-sql-shell-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/samza-sql-shell-log4j.xml b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
new file mode 100644
index 0000000..924ef93
--- /dev/null
+++ b/samza-sql-shell/conf/samza-sql-shell-log4j.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern"
+ value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+ </layout>
+ </appender>
+
+ <appender name="file" class="org.apache.log4j.RollingFileAppender">
+ <param name="append" value="false" />
+ <param name="maxFileSize" value="10MB" />
+ <param name="maxBackupIndex" value="10" />
+ <param name="file" value="${LOG_HOME}/logs/samza-sql-shell.log" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern"
+ value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n" />
+ </layout>
+ </appender>
+
+ <root>
+ <level value="info" />
+ <appender-ref ref="file" />
+ </root>
+</log4j:configuration>
+
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/conf/shell-defaults.conf
----------------------------------------------------------------------
diff --git a/samza-sql-shell/conf/shell-defaults.conf b/samza-sql-shell/conf/shell-defaults.conf
new file mode 100644
index 0000000..0c85e52
--- /dev/null
+++ b/samza-sql-shell/conf/shell-defaults.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Default system properties included when running samza-sql-shell.
+# This is useful for setting default environmental settings.
+
+# Example:
+shell.executor = org.apache.samza.sql.client.impl.SamzaExecutor
+samza.sql.output = compact
+# samza.sql.system.kafka.address = localhost:2181
+# samza.sql.relSchemaProvider.config.schemaDir = /tmp/schemas/
+# samza.sql.ioResolver = config
+# samza.sql.udfResolver = config
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/scripts/samza-sql-shell.sh
----------------------------------------------------------------------
diff --git a/samza-sql-shell/scripts/samza-sql-shell.sh b/samza-sql-shell/scripts/samza-sql-shell.sh
new file mode 100755
index 0000000..d9eb6df
--- /dev/null
+++ b/samza-sql-shell/scripts/samza-sql-shell.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+ base_dir=$(readlink -f $(dirname $0))
+else
+ base_dir=$(dirname $0)
+fi
+
+parent_dir="$(dirname "$base_dir")"
+
+CONF_FILE="-conf $parent_dir/conf/shell-defaults.conf"
+
+if [ "x$LOG_HOME" = "x" ]; then
+ export LOG_HOME=$parent_dir
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+ export LOG4J_OPTS="-Dlog4j.configuration=file:$parent_dir/conf/samza-sql-shell-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+ export HEAP_OPTS="-Xmx4G -Xms4G"
+fi
+
+exec $base_dir/run-class.sh $LOG4J_OPTS -DLOG_HOME=$LOG_HOME org.apache.samza.sql.client.cli.Main $CONF_FILE "$@"
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
new file mode 100755
index 0000000..7c6480b
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * A shell command containing command name and parameters.
+ */
+class CliCommand {
+ private CliCommandType commandType;
+ private String parameters;
+
+ public CliCommand(CliCommandType cmdType) {
+ this.commandType = cmdType;
+ }
+
+ public CliCommand(CliCommandType cmdType, String parameters) {
+ this(cmdType);
+ this.parameters = parameters;
+ }
+
+ public CliCommandType getCommandType() {
+ return commandType;
+ }
+
+ public String getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(String parameters) {
+ this.parameters = parameters;
+ }
+
+ public String getFullCommand() {
+ return commandType.getCommandName() + CliConstants.SPACE + parameters;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
new file mode 100755
index 0000000..0cd170e
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliCommandType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Enum all the commands we now support along with descriptions.
+ */
+enum CliCommandType {
+ SHOW_TABLES("SHOW TABLES", "Shows all available tables.", "Usage: SHOW TABLES <table name>"),
+ SHOW_FUNCTIONS("SHOW FUNCTIONS", "Shows all available UDFs.", "SHOW FUNCTION"),
+ DESCRIBE("DESCRIBE", "Describes a table.", "Usage: DESCRIBE <table name>"),
+
+ SELECT("SELECT", "\tExecutes a SQL SELECT query.", "SELECT uses a standard streaming SQL syntax."),
+ EXECUTE("EXECUTE", "\tExecute a sql file.", "EXECUTE <URI of a sql file>"),
+ INSERT_INTO("INSERT INTO", "Executes a SQL INSERT INTO.", "INSERT INTO uses a standard streaming SQL syntax."),
+ LS("LS", "\tLists all background executions.", "LS [execution ID]"),
+ STOP("STOP", "\tStops an execution.", "Usage: STOP <execution ID>"),
+ RM("RM", "\tRemoves an execution from the list.", "Usage: RM <execution ID>"),
+
+ HELP("HELP", "\tDisplays this help message.", "Usage: HELP [command]"),
+ SET("SET", "\tSets a variable.", "Usage: SET VAR=VAL"),
+ CLEAR("CLEAR", "\tClears the screen.", "CLEAR"),
+ EXIT("EXIT", "\tExits the shell.", "Exit"),
+ QUIT("QUIT", "\tQuits the shell.", "QUIT"),
+
+ INVALID_COMMAND("INVALID_COMMAND", "INVALID_COMMAND", "INVALID_COMMAND");
+
+ private final String cmdName;
+ private final String description;
+ private final String usage;
+
+ CliCommandType(String cmdName, String description, String usage) {
+ this.cmdName = cmdName;
+ this.description = description;
+ this.usage = usage;
+ }
+
+ public static List<String> getAllCommands() {
+ List<String> cmds = new ArrayList<String>();
+ for (CliCommandType t : CliCommandType.values()) {
+ if (t != INVALID_COMMAND)
+ cmds.add(t.getCommandName());
+ }
+ return cmds;
+ }
+
+ public String getCommandName() {
+ return cmdName;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public String getUsage() {
+ return usage;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
new file mode 100755
index 0000000..286dc79
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliConstants.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+/**
+ * Constant definitions for the shell.
+ */
+class CliConstants {
+ public static final String APP_NAME = "Samza SQL Shell";
+ public static final String WINDOW_TITLE = "Samza SQL Shell";
+ public static final String PROMPT_1ST = "Samza SQL";
+ public static final String PROMPT_1ST_END = "> ";
+
+ // All shell environment variables starts with the prefix
+ public static final String CONFIG_SHELL_PREFIX = "shell.";
+ // Specifies the executor used by the shell
+ public static final String CONFIG_EXECUTOR = "shell.executor";
+
+ public static final String VERSION = "0.0.1";
+
+
+ public static final String WELCOME_MESSAGE;
+ static {
+ WELCOME_MESSAGE =
+" ___ ___ ___ ___ ___ \n" +
+" / /\\ / /\\ / /\\ /__/\\ / /\\ \n" +
+" / /::\\ / /::\\ / /::| \\ \\:\\ / /::\\ \n"+
+" /__/:/\\:\\ / /:/\\:\\ / /:|:| \\ \\:\\ / /:/\\:\\ \n"+
+" _\\_ \\:\\ \\:\\ / /::\\ \\:\\ / /:/|:|__ \\ \\:\\ / /::\\ \\:\\ \n"+
+" /__/\\ \\:\\ \\:\\ /__/:/\\:\\_\\:\\ /__/:/_|::::\\ ______\\__\\:\\ /__/:/\\:\\_\\:\\ \n"+
+" \\ \\:\\ \\:\\_\\/ \\__\\/ \\:\\/:/ \\__\\/ /~~/:/ \\ \\::::::::/ \\__\\/ \\:\\/:/ \n"+
+" \\ \\:\\_\\:\\ \\__\\::/ / /:/ \\ \\:\\~~~~~ \\__\\::/ \n"+
+" \\ \\:\\/:/ / /:/ / /:/ \\ \\:\\ / /:/ \n"+
+" \\ \\::/ /__/:/ /__/:/ \\ \\:\\ /__/:/ \n"+
+" \\__\\/ \\__\\/ \\__\\/ \\__\\/ \\__\\/ \n\n"+
+"Welcome to Samza SQL shell (V" + VERSION + "). Enter HELP for all commands.\n\n";
+ }
+
+ public static final char SPACE = '\u0020';
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
new file mode 100644
index 0000000..9384169
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliEnvironment.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CliEnvironment contains "environment variables" that configures the shell behavior.
+ */
+public class CliEnvironment {
+ private static final String debugEnvVar = "shell.debug";
+ private static PrintStream stdout = System.out;
+ private static PrintStream stderr = System.err;
+ private Boolean debug = false;
+
+ boolean isDebug() {
+ return debug;
+ }
+
+ void setDebug(Boolean debug) {
+ this.debug = debug;
+ }
+
+ /**
+ * @param var Environment variable
+ * @param val Value of the environment variable
+ * @return 0 : succeed
+ * -1: invalid var
+ * -2: invalid val
+ */
+ int setEnvironmentVariable(String var, String val) {
+ switch (var.toLowerCase()) {
+ case debugEnvVar:
+ val = val.toLowerCase();
+ if (val.equals("true")) {
+ debug = true;
+ enableJavaSystemOutAndErr();
+ } else if (val.equals("false")) {
+ debug = false;
+ disableJavaSystemOutAndErr();
+ } else
+ return -2;
+ break;
+ default:
+ return -1;
+ }
+
+ return 0;
+ }
+
+ // TODO: Separate the values out of the logic part
+ List<String> getPossibleValues(String var) {
+ List<String> vals = new ArrayList<>();
+ switch (var.toLowerCase()) {
+ case debugEnvVar:
+ vals.add("true");
+ vals.add("false");
+ return vals;
+ default:
+ return null;
+ }
+ }
+
+ void printAll(Writer writer) throws IOException {
+ writer.write(debugEnvVar);
+ writer.write('=');
+ writer.write(debug.toString());
+ writer.write('\n');
+ }
+
+ private void disableJavaSystemOutAndErr() {
+ PrintStream ps = new PrintStream(new NullOutputStream());
+ System.setOut(ps);
+ System.setErr(ps);
+ }
+
+ private void enableJavaSystemOutAndErr() {
+ System.setOut(stdout);
+ System.setErr(stderr);
+ }
+
+ void takeEffect() {
+ if (debug) {
+ enableJavaSystemOutAndErr();
+ } else {
+ // We control terminal directly; Forbid any Java System.out and System.err stuff so
+ // any underlying output will not mess up the console
+ disableJavaSystemOutAndErr();
+ }
+ }
+
+ private class NullOutputStream extends OutputStream {
+ public void close() {
+ }
+
+ public void flush() {
+ }
+
+ public void write(byte[] b) {
+ }
+
+ public void write(byte[] b, int off, int len) {
+ }
+
+ public void write(int b) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
new file mode 100755
index 0000000..c01ed5c
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliHighlighter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.Highlighter;
+import org.jline.reader.LineReader;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A primitive highlighter.
+ */
+public class CliHighlighter implements Highlighter {
+ private static final List<String> keywords;
+
+ static {
+ keywords = CliCommandType.getAllCommands();
+ keywords.add("FROM");
+ keywords.add("WHERE");
+ }
+
+ private static List<String> splitWithSpace(String buffer) {
+ List<String> list = new ArrayList<String>();
+ if (CliUtil.isNullOrEmpty(buffer))
+ return list;
+
+ boolean prevIsSpace = Character.isSpaceChar(buffer.charAt(0));
+ int prevPos = 0;
+ for (int i = 1; i < buffer.length(); ++i) {
+ char c = buffer.charAt(i);
+ boolean isSpace = Character.isSpaceChar(c);
+ if (isSpace != prevIsSpace) {
+ list.add(buffer.substring(prevPos, i));
+ prevPos = i;
+ prevIsSpace = isSpace;
+ }
+ }
+ list.add(buffer.substring(prevPos));
+ return list;
+ }
+
+ public AttributedString highlight(LineReader reader, String buffer) {
+ AttributedStringBuilder builder = new AttributedStringBuilder();
+ List<String> tokens = splitWithSpace(buffer);
+
+ for (String token : tokens) {
+ if (isKeyword(token)) {
+ builder.style(AttributedStyle.BOLD.foreground(AttributedStyle.YELLOW))
+ .append(token);
+ } else {
+ builder.style(AttributedStyle.DEFAULT)
+ .append(token);
+ }
+ }
+
+ return builder.toAttributedString();
+ }
+
+ private boolean isKeyword(String token) {
+ for (String keyword : keywords) {
+ if (keyword.compareToIgnoreCase(token) == 0)
+ return true;
+ }
+ return false;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
new file mode 100755
index 0000000..54c7bf6
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliShell.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.*;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.completer.StringsCompleter;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+
+/**
+ * The shell UI.
+ */
+class CliShell {
+ private final Terminal terminal;
+ private final PrintWriter writer;
+ private final LineReader lineReader;
+ private final String firstPrompt;
+ private final SqlExecutor executor;
+ private final ExecutionContext exeContext;
+ private CliEnvironment env;
+ private boolean keepRunning = true;
+ private Map<Integer, String> executions = new TreeMap<>();
+
+ public CliShell(SqlExecutor executor, CliEnvironment environment, ExecutionContext execContext) {
+ if (executor == null || environment == null || execContext == null) {
+ throw new IllegalArgumentException();
+ }
+
+ // Terminal
+ try {
+ terminal = TerminalBuilder.builder()
+ .name(CliConstants.WINDOW_TITLE)
+ .build();
+ } catch (IOException e) {
+ throw new CliException("Error when creating terminal", e);
+ }
+
+ // Terminal writer
+ writer = terminal.writer();
+
+ // LineReader
+ final DefaultParser parser = new DefaultParser()
+ .eofOnEscapedNewLine(true)
+ .eofOnUnclosedQuote(true);
+ lineReader = LineReaderBuilder.builder()
+ .appName(CliConstants.APP_NAME)
+ .terminal(terminal)
+ .parser(parser)
+ .highlighter(new CliHighlighter())
+ .completer(new StringsCompleter(CliCommandType.getAllCommands()))
+ .build();
+
+ // Command Prompt
+ firstPrompt = new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.foreground(AttributedStyle.YELLOW))
+ .append(CliConstants.PROMPT_1ST + CliConstants.PROMPT_1ST_END)
+ .toAnsi();
+
+ // Execution context and executor
+ env = environment;
+ env.takeEffect();
+ exeContext = execContext;
+ this.executor = executor;
+ this.executor.start(exeContext);
+ }
+
+ Terminal getTerminal() {
+ return terminal;
+ }
+
+ CliEnvironment getEnvironment() {
+ return env;
+ }
+
+ SqlExecutor getExecutor() {
+ return executor;
+ }
+
+ ExecutionContext getExeContext() {
+ return exeContext;
+ }
+
+ /**
+ * Actually run the shell. Does not return until user choose to exit.
+ */
+ public void open() {
+ // Remember we cannot enter alternate screen mode here as there is only one alternate
+ // screen and we need it to show streaming results. Clear the screen instead.
+ clearScreen();
+ writer.write(CliConstants.WELCOME_MESSAGE);
+
+ try {
+ // Check if jna.jar exists in class path
+ try {
+ ClassLoader.getSystemClassLoader().loadClass("com.sun.jna.NativeLibrary");
+ } catch (ClassNotFoundException e) {
+ // Something's wrong. It could be a dumb terminal if neither jna nor jansi lib is there
+ writer.write("Warning: jna.jar does NOT exist. It may lead to a dumb shell or a performance hit.\n");
+ }
+
+ while (keepRunning) {
+ String line;
+ try {
+ line = lineReader.readLine(firstPrompt);
+ } catch (UserInterruptException e) {
+ continue;
+ } catch (EndOfFileException e) {
+ commandQuit();
+ break;
+ }
+
+ if (!CliUtil.isNullOrEmpty(line)) {
+ CliCommand command = parseLine(line);
+ if (command == null)
+ continue;
+
+ switch (command.getCommandType()) {
+ case CLEAR:
+ commandClear();
+ break;
+
+ case DESCRIBE:
+ commandDescribe(command);
+ break;
+
+ case EXECUTE:
+ commandExecuteFile(command);
+ break;
+
+ case EXIT:
+ case QUIT:
+ commandQuit();
+ break;
+
+ case HELP:
+ commandHelp(command);
+ break;
+
+ case INSERT_INTO:
+ commandInsertInto(command);
+ break;
+
+ case LS:
+ commandLs(command);
+ break;
+
+ case RM:
+ commandRm(command);
+ break;
+
+ case SELECT:
+ commandSelect(command);
+ break;
+
+ case SET:
+ commandSet(command);
+ break;
+
+ case SHOW_FUNCTIONS:
+ commandShowFunctions(command);
+ break;
+
+ case SHOW_TABLES:
+ commandShowTables(command);
+ break;
+
+ case STOP:
+ commandStop(command);
+ break;
+
+ case INVALID_COMMAND:
+ printHelpMessage();
+ break;
+
+ default:
+ writer.write("UNDER DEVELOPEMENT. Command:" + command.getCommandType() + "\n");
+ writer.write("Parameters:" +
+ (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" : command.getParameters())
+ + "\n\n");
+ writer.flush();
+ }
+ }
+ }
+ } catch (Exception e) {
+ writer.print(e.getClass().getSimpleName());
+ writer.print(". ");
+ writer.println(e.getMessage());
+ e.printStackTrace(writer);
+ writer.println();
+ writer.println("We are sorry but SamzaSqlShell has encountered a problem and needs to stop.");
+ }
+
+ writer.write("Cleaning up... ");
+ writer.flush();
+ executor.stop(exeContext);
+
+ writer.write("Done.\nBye.\n\n");
+ writer.flush();
+
+ try {
+ terminal.close();
+ } catch (IOException e) {
+ // Doesn't matter
+ }
+ }
+
+ private void commandClear() {
+ clearScreen();
+ }
+
+ private void commandDescribe(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ SqlSchema schema = executor.getTableSchema(exeContext, parameters);
+
+ if (schema == null) {
+ writer.println("Failed to get schema. Error: " + executor.getErrorMsg());
+ } else {
+ writer.println();
+ List<String> lines = formatSchema4Display(schema);
+ for (String line : lines) {
+ writer.println(line);
+ }
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandSet(CliCommand command) {
+ String param = command.getParameters();
+ if (CliUtil.isNullOrEmpty(param)) {
+ try {
+ env.printAll(writer);
+ } catch (IOException e) {
+ e.printStackTrace(writer);
+ }
+ writer.println();
+ writer.flush();
+ return;
+ }
+ String[] params = param.split("=");
+ if (params.length != 2) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ int ret = env.setEnvironmentVariable(params[0], params[1]);
+ if (ret == 0) {
+ writer.print(params[0]);
+ writer.print(" set to ");
+ writer.println(params[1]);
+ } else if (ret == -1) {
+ writer.print("Unknow variable: ");
+ writer.println(params[0]);
+ } else if (ret == -2) {
+ writer.print("Invalid value: ");
+ writer.println(params[1]);
+ List<String> vals = env.getPossibleValues(params[0]);
+ writer.print("Possible values:");
+ for (String s : vals) {
+ writer.print(CliConstants.SPACE);
+ writer.print(s);
+ }
+ writer.println();
+ }
+
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandExecuteFile(CliCommand command) {
+ String fullCmdStr = command.getFullCommand();
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println("Usage: execute <fileuri>\n");
+ writer.flush();
+ return;
+ }
+ URI uri = null;
+ boolean valid = false;
+ File file = null;
+ try {
+ uri = new URI(parameters);
+ file = new File(uri.getPath());
+ valid = file.exists() && !file.isDirectory();
+ } catch (URISyntaxException e) {
+ }
+ if (!valid) {
+ writer.println("Invalid URI.\n");
+ writer.flush();
+ return;
+ }
+
+ NonQueryResult nonQueryResult = executor.executeNonQuery(exeContext, file);
+ if (!nonQueryResult.succeeded()) {
+ writer.print("Execution error: ");
+ writer.println(executor.getErrorMsg());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ executions.put(nonQueryResult.getExecutionId(), fullCmdStr);
+ List<String> submittedStmts = nonQueryResult.getSubmittedStmts();
+ List<String> nonsubmittedStmts = nonQueryResult.getNonSubmittedStmts();
+
+ writer.println("Sql file submitted. Execution ID: " + nonQueryResult.getExecutionId());
+ writer.println("Submitted statements: \n");
+ if (submittedStmts == null || submittedStmts.size() == 0) {
+ writer.println("\tNone.");
+ } else {
+ for (String statement : submittedStmts) {
+ writer.print("\t");
+ writer.println(statement);
+ }
+ writer.println();
+ }
+
+ if (nonsubmittedStmts != null && nonsubmittedStmts.size() != 0) {
+ writer.println("Statements NOT submitted: \n");
+ for (String statement : nonsubmittedStmts) {
+ writer.print("\t");
+ writer.println(statement);
+ }
+ writer.println();
+ }
+
+ writer.println("Note: All query statements in a sql file are NOT submitted.");
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandInsertInto(CliCommand command) {
+ String fullCmdStr = command.getFullCommand();
+ NonQueryResult result = executor.executeNonQuery(exeContext,
+ Collections.singletonList(fullCmdStr));
+
+ if (result.succeeded()) {
+ writer.print("Execution submitted successfully. Id: ");
+ writer.println(String.valueOf(result.getExecutionId()));
+ executions.put(result.getExecutionId(), fullCmdStr);
+ } else {
+ writer.write("Execution failed to submit. Error: ");
+ writer.println(executor.getErrorMsg());
+ }
+
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandLs(CliCommand command) {
+ List<Integer> execIds = new ArrayList<>();
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ execIds.addAll(executions.keySet());
+ } else {
+ String[] params = parameters.split("\u0020");
+ for (String param : params) {
+ Integer id = null;
+ try {
+ id = Integer.valueOf(param);
+ } catch (NumberFormatException e) {
+ }
+ if (id != null && executions.containsKey(id)) {
+ execIds.add(id);
+ }
+ }
+ }
+ if (execIds.size() == 0) {
+ writer.println();
+ return;
+ }
+
+ execIds.sort(Integer::compareTo);
+
+ final int terminalWidth = terminal.getWidth();
+ final int ID_WIDTH = 3;
+ final int STATUS_WIDTH = 20;
+ final int CMD_WIDTH = terminalWidth - ID_WIDTH - STATUS_WIDTH - 4;
+
+ AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+ AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+ for (int i = 0; i < execIds.size(); ++i) {
+ Integer id = execIds.get(i);
+ String cmd = executions.get(id);
+ if (cmd == null)
+ continue;
+
+ String status = "UNKNOWN";
+ try {
+ ExecutionStatus execStatus = executor.queryExecutionStatus(id);
+ if (execStatus != null)
+ status = execStatus.name();
+ } catch (ExecutionException e) {
+ }
+
+ int cmdStartIdx = 0;
+ int cmdLength = cmd.length();
+ StringBuilder line;
+ while (cmdStartIdx < cmdLength) {
+ line = new StringBuilder(terminalWidth);
+ if (cmdStartIdx == 0) {
+ line.append(CliConstants.SPACE);
+ line.append(id);
+ CliUtil.appendTo(line, 1 + ID_WIDTH + 1, CliConstants.SPACE);
+ line.append(status);
+ }
+ CliUtil.appendTo(line, 1 + ID_WIDTH + 1 + STATUS_WIDTH + 1, CliConstants.SPACE);
+
+ int numToWrite = Math.min(CMD_WIDTH, cmdLength - cmdStartIdx);
+ if (numToWrite > 0) {
+ line.append(cmd, cmdStartIdx, cmdStartIdx + numToWrite);
+ cmdStartIdx += numToWrite;
+ }
+
+ if (i % 2 == 0) {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+ attrBuilder.append(line.toString());
+ writer.println(attrBuilder.toAnsi());
+ } else {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+ attrBuilder.append(line.toString());
+ writer.println(attrBuilder.toAnsi());
+ }
+ }
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandRm(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ List<Integer> execIds = new ArrayList<>();
+ String[] params = parameters.split("\u0020");
+ for (String param : params) {
+ Integer id = null;
+ try {
+ id = Integer.valueOf(param);
+ } catch (NumberFormatException e) {
+ }
+ if (id == null || !executions.containsKey(id)) {
+ writer.print("Error: ");
+ writer.print(param);
+ writer.println(" is not a valid id.");
+ } else {
+ execIds.add(id);
+ }
+ }
+
+ for (Integer id : execIds) {
+ ExecutionStatus status = null;
+ try {
+ status = executor.queryExecutionStatus(id);
+ } catch (ExecutionException e) {
+ }
+ if (status == null) {
+ writer.println(String.format("Error: failed to get execution status for %d. %s",
+ id, executor.getErrorMsg()));
+ continue;
+ }
+ if (status == ExecutionStatus.Running) {
+ writer.println(String.format("Execution %d is still running. Stop it first.", id));
+ continue;
+ }
+ if (executor.removeExecution(exeContext, id)) {
+ writer.println(String.format("Execution %d was removed.", id));
+ executions.remove(id);
+ } else {
+ writer.println(String.format("Error: failed to remove execution %d. %s",
+ id, executor.getErrorMsg()));
+ }
+
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandQuit() {
+ keepRunning = false;
+ }
+
+ private void commandSelect(CliCommand command) {
+ QueryResult queryResult = executor.executeQuery(exeContext, command.getFullCommand());
+
+ if (queryResult.succeeded()) {
+ CliView view = new QueryResultLogView();
+ view.open(this, queryResult);
+ executor.stopExecution(exeContext, queryResult.getExecutionId());
+ } else {
+ writer.write("Execution failed. Error: ");
+ writer.println(executor.getErrorMsg());
+ writer.println();
+ writer.flush();
+ }
+ }
+
+ private void commandShowTables(CliCommand command) {
+ List<String> tableNames = executor.listTables(exeContext);
+
+ if (tableNames != null) {
+ for (String tableName : tableNames) {
+ writer.println(tableName);
+ }
+ } else {
+ writer.print("Failed to list tables. Error: ");
+ writer.println(executor.getErrorMsg());
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandShowFunctions(CliCommand command) {
+ List<SqlFunction> fns = executor.listFunctions(exeContext);
+
+ if (fns != null) {
+ for (SqlFunction fn : fns) {
+ writer.println(fn.toString());
+ }
+ } else {
+ writer.print("Failed to list functions. Error: ");
+ writer.println(executor.getErrorMsg());
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandStop(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ writer.println(command.getCommandType().getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+
+ List<Integer> execIds = new ArrayList<>();
+ String[] params = parameters.split("\u0020");
+ for (String param : params) {
+ Integer id = null;
+ try {
+ id = Integer.valueOf(param);
+ } catch (NumberFormatException e) {
+ }
+ if (id == null || !executions.containsKey(id)) {
+ writer.print("Error: ");
+ writer.print(param);
+ writer.println(" is not a valid id.");
+ } else {
+ execIds.add(id);
+ }
+ }
+
+ for (Integer id : execIds) {
+ if (executor.stopExecution(exeContext, id)) {
+ writer.println(String.format("Request to stop execution %d was sent.", id));
+ } else {
+ writer.println(String.format("Failed to stop %d: %s", id, executor.getErrorMsg()));
+ }
+ }
+ writer.println();
+ writer.flush();
+ }
+
+ private void commandHelp(CliCommand command) {
+ String parameters = command.getParameters();
+ if (CliUtil.isNullOrEmpty(parameters)) {
+ printHelpMessage();
+ return;
+ }
+
+ parameters = parameters.trim().toUpperCase();
+ for (CliCommandType cmdType : CliCommandType.values()) {
+ String cmdText = cmdType.getCommandName();
+ if (cmdText.equals(parameters)) {
+ writer.println(cmdType.getUsage());
+ writer.println();
+ writer.flush();
+ return;
+ }
+ }
+
+ writer.print("Unknown command: ");
+ writer.println(parameters);
+ writer.println();
+ writer.flush();
+ }
+
+
+ private CliCommand parseLine(String line) {
+ line = trimCommand(line);
+ if (CliUtil.isNullOrEmpty(line))
+ return null;
+
+ String upperCaseLine = line.toUpperCase();
+ for (CliCommandType cmdType : CliCommandType.values()) {
+ String cmdText = cmdType.getCommandName();
+ if (upperCaseLine.startsWith(cmdText)) {
+ if (upperCaseLine.length() == cmdText.length())
+ return new CliCommand(cmdType);
+ else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) {
+ String parameter = line.substring(cmdText.length()).trim();
+ if (!parameter.isEmpty())
+ return new CliCommand(cmdType, parameter);
+ }
+ }
+ }
+ return new CliCommand(CliCommandType.INVALID_COMMAND);
+ }
+
+ private void printHelpMessage() {
+ writer.println();
+ AttributedStringBuilder builder = new AttributedStringBuilder();
+ builder.append("The following commands are supported by ")
+ .append(CliConstants.APP_NAME)
+ .append(" at the moment.\n\n");
+
+ for (CliCommandType cmdType : CliCommandType.values()) {
+ if (cmdType == CliCommandType.INVALID_COMMAND)
+ continue;
+
+ String cmdText = cmdType.getCommandName();
+ String cmdDescription = cmdType.getDescription();
+
+ builder.style(AttributedStyle.DEFAULT.bold())
+ .append(cmdText)
+ .append("\t\t")
+ .style(AttributedStyle.DEFAULT)
+ .append(cmdDescription)
+ .append("\n");
+ }
+
+ writer.println(builder.toAnsi());
+ writer.println("HELP <COMMAND> to get help for a specific command.\n");
+ writer.flush();
+ }
+
+ private void clearScreen() {
+ terminal.puts(InfoCmp.Capability.clear_screen);
+ }
+
+ /*
+ Field | Type
+ -------------------------
+ Field1 | Type 1
+ Field2 | VARCHAR(STRING)
+ Field... | VARCHAR(STRING)
+ -------------------------
+ */
+ private List<String> formatSchema4Display(SqlSchema schema) {
+ final String HEADER_FIELD = "Field";
+ final String HEADER_TYPE = "Type";
+ final char SEPERATOR = '|';
+ final char LINE_SEP = '-';
+
+ int terminalWidth = terminal.getWidth();
+ // Two spaces * 2 plus one SEPERATOR
+ if (terminalWidth < 2 + 2 + 1 + HEADER_FIELD.length() + HEADER_TYPE.length()) {
+ return Collections.singletonList("Not enough room.");
+ }
+
+ // Find the best seperator position for least rows
+ int seperatorPos = HEADER_FIELD.length() + 2;
+ int minRowNeeded = Integer.MAX_VALUE;
+ int longestLineCharNum = 0;
+ int rowCount = schema.getFieldCount();
+ for (int j = seperatorPos; j < terminalWidth - HEADER_TYPE.length() - 2; ++j) {
+ boolean fieldWrapped = false;
+ int rowNeeded = 0;
+ for (int i = 0; i < rowCount; ++i) {
+ int fieldLen = schema.getFieldName(i).length();
+ int typeLen = schema.getFieldTypeName(i).length();
+ int fieldRowNeeded = CliUtil.ceilingDiv(fieldLen, j - 2);
+ int typeRowNeeded = CliUtil.ceilingDiv(typeLen, terminalWidth - 1 - j - 2);
+
+ rowNeeded += Math.max(fieldRowNeeded, typeRowNeeded);
+ fieldWrapped |= fieldRowNeeded > 1;
+ if (typeRowNeeded > 1) {
+ longestLineCharNum = terminalWidth;
+ } else {
+ longestLineCharNum = Math.max(longestLineCharNum, j + typeLen + 2 + 1);
+ }
+ }
+ if (rowNeeded < minRowNeeded) {
+ minRowNeeded = rowNeeded;
+ seperatorPos = j;
+ }
+ if (!fieldWrapped)
+ break;
+ }
+
+ List<String> lines = new ArrayList<>(minRowNeeded + 4);
+
+ // Header
+ StringBuilder line = new StringBuilder(terminalWidth);
+ line.append(CliConstants.SPACE);
+ line.append(HEADER_FIELD);
+ CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+ line.append(SEPERATOR);
+ line.append(CliConstants.SPACE);
+ line.append(HEADER_TYPE);
+ lines.add(line.toString());
+ line = new StringBuilder(terminalWidth);
+ CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+ lines.add(line.toString());
+
+ // Body
+ AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
+ AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
+
+ final int fieldColSize = seperatorPos - 2;
+ final int typeColSize = terminalWidth - seperatorPos - 1 - 2;
+ for (int i = 0; i < rowCount; ++i) {
+ String field = schema.getFieldName(i);
+ String type = schema.getFieldTypeName(i);
+ int fieldLen = field.length();
+ int typeLen = type.length();
+ int fieldStartIdx = 0, typeStartIdx = 0;
+ while (fieldStartIdx < fieldLen || typeStartIdx < typeLen) {
+ line = new StringBuilder(terminalWidth);
+ line.append(CliConstants.SPACE);
+ int numToWrite = Math.min(fieldColSize, fieldLen - fieldStartIdx);
+ if (numToWrite > 0) {
+ line.append(field, fieldStartIdx, fieldStartIdx + numToWrite);
+ fieldStartIdx += numToWrite;
+ }
+ CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
+ line.append(SEPERATOR);
+ line.append(CliConstants.SPACE);
+
+ numToWrite = Math.min(typeColSize, typeLen - typeStartIdx);
+ if (numToWrite > 0) {
+ line.append(type, typeStartIdx, typeStartIdx + numToWrite);
+ typeStartIdx += numToWrite;
+ }
+
+ if (i % 2 == 0) {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
+ attrBuilder.append(line.toString());
+ lines.add(attrBuilder.toAnsi());
+ } else {
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
+ attrBuilder.append(line.toString());
+ lines.add(attrBuilder.toAnsi());
+ }
+ }
+ }
+
+ // Footer
+ line = new StringBuilder(terminalWidth);
+ CliUtil.appendTo(line, longestLineCharNum - 1, LINE_SEP);
+ lines.add(line.toString());
+ return lines;
+ }
+
+ // Trims: leading spaces; trailing spaces and ";"s
+ private String trimCommand(String command) {
+ if (CliUtil.isNullOrEmpty(command))
+ return command;
+
+ int len = command.length();
+ int st = 0;
+
+ while ((st < len) && (command.charAt(st) <= ' ')) {
+ st++;
+ }
+ while ((st < len) && ((command.charAt(len - 1) <= ' ')
+ || command.charAt(len - 1) == ';')) {
+ len--;
+ }
+ return ((st > 0) || (len < command.length())) ? command.substring(st, len) : command;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
new file mode 100644
index 0000000..de82100
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/CliView.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.QueryResult;
+
+/**
+ * For displaying the streaming result of a SELECT statement.
+ */
+public interface CliView {
+ public void open(CliShell shell, QueryResult queryResult);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
new file mode 100755
index 0000000..d85cdae
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/Main.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.sql.client.impl.SamzaExecutor;
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.apache.samza.sql.client.util.CliException;
+import org.apache.samza.sql.client.util.CliUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry of the program.
+ */
+public class Main {
+ private static final Logger LOG = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) {
+ // Get configuration file path
+ String configFilePath = null;
+ for(int i = 0; i < args.length; ++i) {
+ switch(args[i]) {
+ case "-conf":
+ if(i + 1 < args.length) {
+ configFilePath = args[i + 1];
+ i++;
+ }
+ break;
+ default:
+ LOG.warn("Unknown parameter {}", args[i]);
+ break;
+ }
+ }
+
+ SqlExecutor executor = null;
+ CliEnvironment environment = new CliEnvironment();
+ Map<String, String> executorConfig = new HashMap<>();
+
+ if(!CliUtil.isNullOrEmpty(configFilePath)) {
+ LOG.info("Configuration file path is: {}", configFilePath);
+ try {
+ FileReader fileReader = new FileReader(configFilePath);
+ BufferedReader bufferedReader = new BufferedReader(fileReader);
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ if (line.startsWith("#") || line.startsWith("[")) {
+ continue;
+ }
+ String[] strs = line.split("=");
+ if (strs.length != 2) {
+ continue;
+ }
+ String key = strs[0].trim().toLowerCase();
+ String value = strs[1].trim();
+ if(key.startsWith(CliConstants.CONFIG_SHELL_PREFIX)) {
+ if(key.equals(CliConstants.CONFIG_EXECUTOR)) {
+ try {
+ Class<?> clazz = Class.forName(value);
+ Constructor<?> ctor = clazz.getConstructor();
+ executor = (SqlExecutor) ctor.newInstance();
+ LOG.info("Sql executor creation succeed. Executor class is: {}", value);
+ } catch (ClassNotFoundException | NoSuchMethodException
+ | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ throw new CliException(String.format("Failed to create executor %s.", value), e);
+ }
+ continue;
+ }
+
+ // Suppose a shell variable.
+ int result = environment.setEnvironmentVariable(key, value);
+ if(result == -1) { // CliEnvironment doesn't recognize the key.
+ LOG.warn("Unknowing shell environment variable: {}", key);
+ } else if(result == -2) { // Invalid value
+ LOG.warn("Unknowing shell environment value: {}", value);
+ }
+ } else {
+ executorConfig.put(key, value);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Error in opening and reading the configuration file {}", e.toString());
+ }
+ }
+ if(executor == null) {
+ executor = new SamzaExecutor();
+ }
+
+ CliShell shell = new CliShell(executor, environment, new ExecutionContext(executorConfig));
+ shell.open();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
new file mode 100644
index 0000000..b1973bc
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/cli/QueryResultLogView.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.cli;
+
+import org.apache.samza.sql.client.interfaces.ExecutionContext;
+import org.apache.samza.sql.client.interfaces.QueryResult;
+import org.apache.samza.sql.client.interfaces.SqlExecutor;
+import org.jline.keymap.BindingReader;
+import org.jline.keymap.KeyMap;
+import org.jline.terminal.Attributes;
+import org.jline.terminal.Terminal;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp;
+
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.jline.keymap.KeyMap.ctrl;
+
+
+/**
+ * A scrolling (logging) view of the query result of a streaming SELECT statement.
+ */
+
+public class QueryResultLogView implements CliView {
+ private static final int DEFAULT_REFRESH_INTERVAL = 100; // all intervals are in ms
+
+ private int refreshInterval = DEFAULT_REFRESH_INTERVAL;
+ private int height;
+ private Terminal terminal;
+ private SqlExecutor executor;
+ private ExecutionContext exeContext;
+ private volatile boolean keepRunning = true;
+ private boolean paused = false;
+
+ // Stupid BindingReader doesn't have a real nonblocking mode
+ // Must create a new thread to get user input
+ private Thread inputThread;
+ private BindingReader keyReader;
+
+ public QueryResultLogView() {
+ }
+
+ // -- implementation of CliView -------------------------------------------
+
+ public void open(CliShell shell, QueryResult queryResult) {
+ terminal = shell.getTerminal();
+ executor = shell.getExecutor();
+ exeContext = shell.getExeContext();
+
+ TerminalStatus prevStatus = setupTerminal();
+ try {
+ keyReader = new BindingReader(terminal.reader());
+ inputThread = new InputThread();
+ inputThread.start();
+ while (keepRunning) {
+ try {
+ display();
+ if (keepRunning)
+ Thread.sleep(refreshInterval);
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+
+ try {
+ inputThread.join(1 * 1000);
+ } catch (InterruptedException e) {
+ }
+ } finally {
+ restoreTerminal(prevStatus);
+ }
+ if (inputThread.isAlive()) {
+ terminal.writer().println("Warning: input thread hang. Have to kill!");
+ terminal.writer().flush();
+ inputThread.interrupt();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void display() {
+ updateTerminalSize();
+ int rowsInBuffer = executor.getRowCount();
+ if (rowsInBuffer <= 0 || paused) {
+ clearStatusBar();
+ drawStatusBar(rowsInBuffer);
+ return;
+ }
+
+ while (rowsInBuffer > 0) {
+ clearStatusBar();
+ int step = 10;
+ List<String[]> lines = executor.consumeQueryResult(exeContext, 0, step - 1);
+ for (String[] line : lines) {
+ for (int i = 0; i < line.length; ++i) {
+ terminal.writer().write(line[i] == null ? "null" : line[i]);
+ terminal.writer().write(i == line.length - 1 ? "\n" : " ");
+ }
+ }
+ terminal.flush();
+ clearStatusBar();
+ drawStatusBar(rowsInBuffer);
+
+ if (!keepRunning || paused)
+ return;
+
+ rowsInBuffer = executor.getRowCount();
+ }
+ }
+
+ private void clearStatusBar() {
+ terminal.puts(InfoCmp.Capability.save_cursor);
+ terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+ terminal.puts(InfoCmp.Capability.delete_line, height - 1, 0);
+ terminal.puts(InfoCmp.Capability.restore_cursor);
+ }
+
+ private void drawStatusBar(int rowsInBuffer) {
+ terminal.puts(InfoCmp.Capability.save_cursor);
+ terminal.puts(InfoCmp.Capability.cursor_address, height - 1, 0);
+ AttributedStyle statusBarStyle = AttributedStyle.DEFAULT.background(AttributedStyle.WHITE)
+ .foreground(AttributedStyle.BLACK);
+ AttributedStringBuilder attrBuilder = new AttributedStringBuilder()
+ .style(statusBarStyle.bold().italic())
+ .append("Q")
+ .style(statusBarStyle)
+ .append(": Quit ")
+ .style(statusBarStyle.bold().italic())
+ .append("SPACE")
+ .style(statusBarStyle)
+ .append(": Pause/Resume ")
+ .append(String.valueOf(rowsInBuffer) + " rows in buffer ");
+ if (paused) {
+ attrBuilder.style(statusBarStyle.bold().foreground(AttributedStyle.RED).blink())
+ .append("PAUSED");
+ }
+ String statusBarText = attrBuilder.toAnsi();
+ terminal.writer().print(statusBarText);
+ terminal.flush();
+ terminal.puts(InfoCmp.Capability.restore_cursor);
+ }
+
+ private TerminalStatus setupTerminal() {
+ TerminalStatus prevStatus = new TerminalStatus();
+
+ // Signal handlers
+ prevStatus.handler_INT = terminal.handle(Terminal.Signal.INT, this::handleSignal);
+ prevStatus.handler_QUIT = terminal.handle(Terminal.Signal.QUIT, this::handleSignal);
+ prevStatus.handler_TSTP = terminal.handle(Terminal.Signal.TSTP, this::handleSignal);
+ prevStatus.handler_CONT = terminal.handle(Terminal.Signal.CONT, this::handleSignal);
+ prevStatus.handler_WINCH = terminal.handle(Terminal.Signal.WINCH, this::handleSignal);
+
+ // Attributes
+ prevStatus.attributes = terminal.getAttributes();
+ Attributes newAttributes = new Attributes(prevStatus.attributes);
+ // (003, ETX, Ctrl-C, or also 0177, DEL, rubout) Interrupt char‐
+ // acter (INTR). Send a SIGINT signal. Recognized when ISIG is
+ // set, and then not passed as input.
+ newAttributes.setControlChar(Attributes.ControlChar.VINTR, 0);
+ // (034, FS, Ctrl-\) Quit character (QUIT). Send SIGQUIT signal.
+ // Recognized when ISIG is set, and then not passed as input.
+ // newAttributes.setControlChar(Attributes.ControlChar.VQUIT, 0);
+ newAttributes.setControlChar(Attributes.ControlChar.VMIN, 1);
+ newAttributes.setControlChar(Attributes.ControlChar.VTIME, 0);
+ // Enables signals and SIGTTOU signal to the process group of a background
+ // process which tries to write to our terminal
+ newAttributes.setLocalFlags(
+ EnumSet.of(Attributes.LocalFlag.ISIG, Attributes.LocalFlag.TOSTOP), true);
+ // No canonical mode, no echo, and no implementation-defined input processing
+ newAttributes.setLocalFlags(EnumSet.of(
+ Attributes.LocalFlag.ICANON, Attributes.LocalFlag.ECHO,
+ Attributes.LocalFlag.IEXTEN), false);
+ // Input flags
+ newAttributes.setInputFlags(EnumSet.of(
+ Attributes.InputFlag.ICRNL, Attributes.InputFlag.INLCR, Attributes.InputFlag.IXON), false);
+ terminal.setAttributes(newAttributes);
+
+ // Capabilities
+ // tput smcup; use alternate screen
+ terminal.puts(InfoCmp.Capability.enter_ca_mode);
+ terminal.puts(InfoCmp.Capability.cursor_invisible);
+ terminal.puts(InfoCmp.Capability.cursor_home);
+
+ terminal.flush();
+
+ return prevStatus;
+ }
+
+ private void restoreTerminal(TerminalStatus status) {
+ // Signal handlers
+ terminal.handle(Terminal.Signal.INT, status.handler_INT);
+ terminal.handle(Terminal.Signal.QUIT, status.handler_QUIT);
+ terminal.handle(Terminal.Signal.TSTP, status.handler_TSTP);
+ terminal.handle(Terminal.Signal.CONT, status.handler_CONT);
+ terminal.handle(Terminal.Signal.WINCH, status.handler_WINCH);
+
+ // Attributes
+ terminal.setAttributes(status.attributes);
+
+ // Capability
+ terminal.puts(InfoCmp.Capability.exit_ca_mode);
+ terminal.puts(InfoCmp.Capability.cursor_visible);
+ }
+
+ private void handleSignal(Terminal.Signal signal) {
+ switch (signal) {
+ case INT:
+ case QUIT:
+ keepRunning = false;
+ break;
+ case TSTP:
+ paused = true;
+ break;
+ case CONT:
+ paused = false;
+ break;
+ case WINCH:
+ updateTerminalSize();
+ break;
+ }
+ }
+
+ private void updateTerminalSize() {
+ terminal.flush();
+ height = terminal.getHeight();
+ }
+
+ private KeyMap<Action> bindActionKey() {
+ KeyMap<Action> keyMap = new KeyMap<>();
+ keyMap.bind(Action.QUIT, "Q", "q", ctrl('c'));
+ keyMap.bind(Action.SPACE, " ");
+
+ return keyMap;
+ }
+
+ public enum Action {
+ QUIT,
+ SPACE
+ }
+
+ private static class TerminalStatus {
+ Terminal.SignalHandler handler_INT;
+ Terminal.SignalHandler handler_QUIT;
+ Terminal.SignalHandler handler_TSTP;
+ Terminal.SignalHandler handler_CONT;
+ Terminal.SignalHandler handler_WINCH;
+
+ Attributes attributes;
+ }
+
+ private class InputThread extends Thread {
+ public InputThread() {
+ }
+
+ public void run() {
+ KeyMap<Action> keyMap = bindActionKey();
+
+ Action action = keyReader.readBinding(keyMap, null, true);
+ while (action != null && keepRunning) {
+ switch (action) {
+ case QUIT:
+ keepRunning = false;
+ return;
+ case SPACE:
+ paused = !paused;
+ break;
+ }
+ action = keyReader.readBinding(keyMap, null, true);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
new file mode 100644
index 0000000..42dd285
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/AvroSqlSchemaConverter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.samza.SamzaException;
+import org.apache.samza.sql.client.interfaces.SqlSchema;
+import org.apache.samza.sql.client.interfaces.SqlSchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Schema converter which converts Avro schema to Samza Sql schema
+ */
+public class AvroSqlSchemaConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(AvroSqlSchemaConverter.class);
+
+ public static SqlSchema convertAvroToSamzaSqlSchema(String schema) {
+ Schema avroSchema = Schema.parse(schema);
+ return getSchema(avroSchema.getFields());
+ }
+
+ private static SqlSchema getSchema(List<Schema.Field> fields) {
+ SqlSchemaBuilder schemaBuilder = SqlSchemaBuilder.builder();
+ for (Schema.Field field : fields) {
+ schemaBuilder.addField(field.name(), getColumnTypeName(getFieldType(field.schema())));
+ }
+ return schemaBuilder.toSchema();
+ }
+
+ private static String getColumnTypeName(SamzaSqlFieldType fieldType) {
+ if (fieldType.isPrimitiveField()) {
+ return fieldType.getTypeName().toString();
+ } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.MAP) {
+ return String.format("MAP(%s)", getColumnTypeName(fieldType.getValueType()));
+ } else if (fieldType.getTypeName() == SamzaSqlFieldType.TypeName.ARRAY) {
+ return String.format("ARRAY(%s)", getColumnTypeName(fieldType.getElementType()));
+ } else {
+ SqlSchema schema = fieldType.getRowSchema();
+ List<String> fieldTypes = IntStream.range(0, schema.getFieldCount())
+ .mapToObj(i -> schema.getFieldName(i) + " " + schema.getFieldTypeName(i))
+ .collect(Collectors.toList());
+ String rowSchemaValue = Joiner.on(", ").join(fieldTypes);
+ return String.format("STRUCT(%s)", rowSchemaValue);
+ }
+ }
+
+ private static SamzaSqlFieldType getFieldType(org.apache.avro.Schema schema) {
+ switch (schema.getType()) {
+ case ARRAY:
+ return SamzaSqlFieldType.createArrayFieldType(getFieldType(schema.getElementType()));
+ case BOOLEAN:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BOOLEAN);
+ case DOUBLE:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.DOUBLE);
+ case FLOAT:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.FLOAT);
+ case ENUM:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+ case UNION:
+ // NOTE: We only support Union types when they are used for representing Nullable fields in Avro
+ List<org.apache.avro.Schema> types = schema.getTypes();
+ if (types.size() == 2) {
+ if (types.get(0).getType() == org.apache.avro.Schema.Type.NULL) {
+ return getFieldType(types.get(1));
+ } else if ((types.get(1).getType() == org.apache.avro.Schema.Type.NULL)) {
+ return getFieldType(types.get(0));
+ }
+ }
+ case FIXED:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+ case STRING:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.STRING);
+ case BYTES:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.BYTES);
+ case INT:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT32);
+ case LONG:
+ return SamzaSqlFieldType.createPrimitiveFieldType(SamzaSqlFieldType.TypeName.INT64);
+ case RECORD:
+ return SamzaSqlFieldType.createRowFieldType(getSchema(schema.getFields()));
+ case MAP:
+ return SamzaSqlFieldType.createMapFieldType(getFieldType(schema.getValueType()));
+ default:
+ String msg = String.format("Field Type %s is not supported", schema.getType());
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
new file mode 100644
index 0000000..49e051a
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/CliLoggingSystemFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * System factory of Samza Sql Shell which needs to provide Consumer, Producer and Admin
+ */
+public class CliLoggingSystemFactory implements SystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CliLoggingSystemFactory.class);
+ private static AtomicInteger messageCounter = new AtomicInteger(0);
+
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ return new CliLoggingSystemFactory.LoggingSystemProducer();
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new CliLoggingSystemFactory.SimpleSystemAdmin(config);
+ }
+
+ private static class SimpleSystemAdmin implements SystemAdmin {
+
+ public SimpleSystemAdmin(Config config) {
+ }
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+ return offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, null));
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+ return streamNames.stream()
+ .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName,
+ Collections.singletonMap(new Partition(0),
+ new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)))));
+ }
+
+ @Override
+ public Integer offsetComparator(String offset1, String offset2) {
+ if (offset1 == null) {
+ return offset2 == null ? 0 : -1;
+ } else if (offset2 == null) {
+ return 1;
+ }
+ return offset1.compareTo(offset2);
+ }
+ }
+
+ private class LoggingSystemProducer implements SystemProducer {
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void register(String source) {
+ LOG.info("Registering source" + source);
+ }
+
+ @Override
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ LOG.info(String.format(String.format("Message %d :", messageCounter.incrementAndGet())));
+ String msg = String.format("OutputStream:%s Key:%s Value:%s", envelope.getSystemStream(), envelope.getKey(),
+ new String((byte[]) envelope.getMessage()));
+ LOG.info(msg);
+
+ SamzaExecutor.saveOutputMessage(envelope);
+ }
+
+ @Override
+ public void flush(String source) {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/e203db2a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
new file mode 100644
index 0000000..8d0b12f
--- /dev/null
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/FileSystemAvroRelSchemaProviderFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.client.impl;
+
+import org.apache.avro.Schema;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.avro.AvroRelSchemaProvider;
+import org.apache.samza.sql.avro.AvroTypeFactoryImpl;
+import org.apache.samza.sql.interfaces.RelSchemaProvider;
+import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
+import org.apache.samza.system.SystemStream;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Relational schemas provider which reads system schema from a given directory
+ */
+public class FileSystemAvroRelSchemaProviderFactory implements RelSchemaProviderFactory {
+
+ public static final String CFG_SCHEMA_DIR = "schemaDir";
+
+ @Override
+ public RelSchemaProvider create(SystemStream systemStream, Config config) {
+ return new FileSystemAvroRelSchemaProvider(systemStream, config);
+ }
+
+ private class FileSystemAvroRelSchemaProvider implements AvroRelSchemaProvider {
+ private final SystemStream systemStream;
+ private final String schemaDir;
+
+ public FileSystemAvroRelSchemaProvider(SystemStream systemStream, Config config) {
+ this.systemStream = systemStream;
+ this.schemaDir = config.get(CFG_SCHEMA_DIR);
+ }
+
+ @Override
+ public RelDataType getRelationalSchema() {
+ String schemaStr = this.getSchema(this.systemStream);
+ Schema schema = Schema.parse(schemaStr);
+ AvroTypeFactoryImpl avroTypeFactory = new AvroTypeFactoryImpl();
+ return avroTypeFactory.createType(schema);
+ }
+
+ @Override
+ public String getSchema(SystemStream systemStream) {
+ String fileName = String.format("%s.avsc", systemStream.getStream());
+ File file = new File(schemaDir, fileName);
+ try {
+ return Schema.parse(file).toString();
+ } catch (IOException e) {
+ throw new SamzaException(e);
+ }
+ }
+ }
+}