You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/21 16:58:27 UTC
[2/4] apex-malhar git commit: APEXMALHAR-1818 SQL Support for
converting given SQL statement to APEX DAG.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java
new file mode 100644
index 0000000..68343ce
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This class is the main class that converts relational algebra to a sub-DAG.
+ */
+@InterfaceStability.Evolving
+public class RelNodeVisitor
+{
+ private final DAG dag;
+ private final TupleSchemaRegistry tupleSchemaRegistry;
+ private final JavaTypeFactory typeFactory;
+
+ public RelNodeVisitor(DAG dag, JavaTypeFactory typeFactory)
+ {
+ this.dag = dag;
+ this.typeFactory = typeFactory;
+ this.tupleSchemaRegistry = new TupleSchemaRegistry();
+ }
+
+ /**
+ * This is the main method in this relational node visitor which traverses the relational algebra in reverse direction
+ * and populate the given underlying DAG object.
+ *
+ * @param relNode RelNode which needs to be traversed.
+ *
+ * @return RelInfo representing information of current stage
+ * @throws Exception
+ */
+ public final RelInfo traverse(RelNode relNode) throws Exception
+ {
+ List<RelInfo> inputStreams = new ArrayList<>();
+ for (RelNode input : relNode.getInputs()) {
+ inputStreams.add(traverse(input));
+ }
+
+ ApexRelNode.RelContext relContext = new ApexRelNode.RelContext(dag, typeFactory, tupleSchemaRegistry);
+
+ RelInfo currentNodeRelInfo;
+ ApexRelNode apexRelNode = ApexRelNode.relNodeMapping.get(relNode.getClass());
+ if (apexRelNode == null) {
+ throw new UnsupportedOperationException("RelNode " + relNode.getRelTypeName() + " is not supported.");
+ }
+ currentNodeRelInfo = apexRelNode.visit(relContext, relNode, inputStreams);
+
+ if (currentNodeRelInfo != null && inputStreams.size() != 0) {
+ for (int i = 0; i < inputStreams.size(); i++) {
+ RelInfo inputStream = inputStreams.get(i);
+ Operator.OutputPort outputPort = inputStream.getOutPort();
+ Operator.InputPort inputPort = currentNodeRelInfo.getInputPorts().get(i);
+
+ String streamName = OperatorUtils.getUniqueStreamName(inputStream.getRelName(),
+ currentNodeRelInfo.getRelName());
+ Class schema;
+ if (inputStream.getOutRelDataType() != null) {
+ schema = TupleSchemaRegistry.getSchemaForRelDataType(tupleSchemaRegistry, streamName,
+ inputStream.getOutRelDataType());
+ } else if (inputStream.getClazz() != null) {
+ schema = inputStream.getClazz();
+ } else {
+ throw new RuntimeException("Unexpected condition reached.");
+ }
+ dag.setOutputPortAttribute(outputPort, Context.PortContext.TUPLE_CLASS, schema);
+ dag.setInputPortAttribute(inputPort, Context.PortContext.TUPLE_CLASS, schema);
+ dag.addStream(streamName, outputPort, inputPort);
+ }
+ }
+
+ if (currentNodeRelInfo.getOutPort() == null) {
+ // End of the pipeline.
+ String schemaJar = tupleSchemaRegistry.generateCommonJar();
+
+ String jars = dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
+ dag.setAttribute(Context.DAGContext.LIBRARY_JARS,
+ ((jars != null) && (jars.length() != 0)) ? jars + "," + schemaJar : schemaJar);
+ }
+
+ return currentNodeRelInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java
new file mode 100644
index 0000000..6d16f63
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.schema;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.table.Endpoint;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This is representation of Apex source/destination to Calcite's {@link StreamableTable} table.
+ * Any table that gets registered with {@link org.apache.apex.malhar.sql.SQLExecEnvironment}
+ * gets registered as {@link ApexSQLTable}.
+ */
+@InterfaceStability.Evolving
+public class ApexSQLTable implements ScannableTable, StreamableTable
+{
+ private SchemaPlus schema;
+ private String name;
+ private Map<String, Object> operands;
+ private RelDataType rowType;
+ private Endpoint endpoint;
+
+ public ApexSQLTable(SchemaPlus schemaPlus, String name, Map<String, Object> operands, RelDataType rowType,
+ Endpoint endpoint)
+ {
+ this.schema = schemaPlus;
+ this.name = name;
+ this.operands = operands;
+ this.rowType = rowType;
+ this.endpoint = endpoint;
+ }
+
+ public ApexSQLTable(SchemaPlus schema, String name, Endpoint endpoint)
+ {
+ this(schema, name, null, null, endpoint);
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext dataContext)
+ {
+ return null;
+ }
+
+ @Override
+ public Table stream()
+ {
+ return new ApexSQLTable(schema, name, operands, rowType, endpoint);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory)
+ {
+ if (rowType == null) {
+ rowType = endpoint.getRowType(relDataTypeFactory);
+ }
+ return rowType;
+ }
+
+ @Override
+ public Statistic getStatistic()
+ {
+ return Statistics.of(100d, ImmutableList.<ImmutableBitSet>of(), RelCollations.createSingleton(0));
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType()
+ {
+ return Schema.TableType.STREAM;
+ }
+
+ public SchemaPlus getSchema()
+ {
+ return schema;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Map<String, Object> getOperands()
+ {
+ return operands;
+ }
+
+ public RelDataType getRowType()
+ {
+ return rowType;
+ }
+
+ public Endpoint getEndpoint()
+ {
+ return endpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java
new file mode 100644
index 0000000..c18f854
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.schema;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.Endpoint;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.apex.malhar.sql.table.MessageFormat;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TableFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ApexSQLTableFactory implements TableFactory<Table>
+{
+ @SuppressWarnings("unchecked")
+ @Override
+ public Table create(SchemaPlus schemaPlus, String name, Map<String, Object> operands, RelDataType rowType)
+ {
+ Endpoint endpoint;
+ String endpointSystemType = (String)operands.get(Endpoint.ENDPOINT);
+
+ if (endpointSystemType.equalsIgnoreCase(Endpoint.EndpointType.FILE.name())) {
+ endpoint = new FileEndpoint();
+ } else if (endpointSystemType.equalsIgnoreCase(Endpoint.EndpointType.KAFKA.name())) {
+ endpoint = new KafkaEndpoint();
+ } else {
+ throw new RuntimeException("Cannot find endpoint");
+ }
+ endpoint.setEndpointOperands((Map<String, Object>)operands.get(Endpoint.SYSTEM_OPERANDS));
+
+ MessageFormat mf;
+ String messageFormat = (String)operands.get(MessageFormat.MESSAGE_FORMAT);
+ if (messageFormat.equalsIgnoreCase(MessageFormat.MessageFormatType.CSV.name())) {
+ mf = new CSVMessageFormat();
+ } else {
+ throw new RuntimeException("Cannot find message format");
+ }
+ mf.setMessageFormatOperands((Map<String, Object>)operands.get(MessageFormat.MESSAGE_FORMAT_OPERANDS));
+
+ endpoint.setMessageFormat(mf);
+
+ return new ApexSQLTable(schemaPlus, name, operands, rowType, endpoint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
new file mode 100644
index 0000000..7924298
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.schema;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Time;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.codehaus.jettison.json.JSONException;
+
+import org.apache.apex.malhar.lib.utils.ClassLoaderUtils;
+import org.apache.apex.malhar.sql.codegen.BeanClassGenerator;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@InterfaceStability.Evolving
+public class TupleSchemaRegistry
+{
+ public static final String FQCN_PACKAGE = "org.apache.apex.generated.schema.";
+ private Map<String, Schema> schemas = new HashMap<>();
+
+ public Schema createNewSchema(String name)
+ {
+ if (schemas.containsKey(name)) {
+ return schemas.get(name);
+ }
+
+ Schema schema = new Schema();
+ schema.name = name;
+ schemas.put(name, schema);
+
+ return schema;
+ }
+
+ public Schema getSchemaDefinition(String name)
+ {
+ return schemas.get(name);
+ }
+
+ public String generateCommonJar() throws IOException
+ {
+ File file = File.createTempFile("schemaSQL", ".jar");
+
+ FileSystem fs = FileSystem.newInstance(file.toURI(), new Configuration());
+ FSDataOutputStream out = fs.create(new Path(file.getAbsolutePath()));
+ JarOutputStream jout = new JarOutputStream(out);
+
+ for (Schema schema : schemas.values()) {
+ jout.putNextEntry(new ZipEntry(schema.fqcn.replace(".", "/") + ".class"));
+ jout.write(schema.beanClassBytes);
+ jout.closeEntry();
+ }
+
+ jout.close();
+ out.close();
+
+ return file.getAbsolutePath();
+ }
+
+ public static Class getSchemaForRelDataType(TupleSchemaRegistry registry, String schemaName, RelDataType rowType)
+ {
+ if (rowType.isStruct()) {
+ TupleSchemaRegistry.Schema newSchema = registry.createNewSchema(schemaName);
+ for (RelDataTypeField field : rowType.getFieldList()) {
+ RelDataType type = field.getType();
+ newSchema.addField(OperatorUtils.getValidFieldName(field), convertPrimitiveToSqlType(type));
+ }
+ try {
+ newSchema.generateBean();
+ } catch (IOException | JSONException e) {
+ throw new RuntimeException("Failed to generate schema", e);
+ }
+ return newSchema.beanClass;
+ } else {
+ throw new UnsupportedOperationException("Non-struct row type is not implemented.");
+ }
+ }
+
+ private static Class convertPrimitiveToSqlType(RelDataType type)
+ {
+ /* I hope that following this method instead of calling value.value() is better
+ because we can catch any type mismatches. */
+ switch (type.getSqlTypeName()) {
+ case BOOLEAN:
+ return Boolean.class;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return Integer.class;
+ case BIGINT:
+ return Long.class;
+ case REAL:
+ return Float.class;
+ case FLOAT:
+ case DOUBLE:
+ return Double.class;
+ case DATE:
+ return Date.class;
+ case TIME:
+ return Date.class;
+ case TIMESTAMP:
+ return Date.class;
+ case CHAR:
+ case VARCHAR:
+ return String.class;
+ case BINARY:
+ case VARBINARY:
+ return Byte.class;
+ case ANY:
+ case SYMBOL:
+ return Object.class;
+ default:
+ throw new RuntimeException(String.format("Unsupported type %s", type.getSqlTypeName()));
+ }
+ }
+
+ public enum Type
+ {
+ BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class),
+ FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class),
+ DATE(Date.class), TIME(Time.class);
+
+ private Class javaType;
+
+ Type(Class javaType)
+ {
+ this.javaType = javaType;
+ }
+
+ public static Type getFromJavaType(Class type)
+ {
+ for (Type supportType : Type.values()) {
+ if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) {
+ return supportType;
+ }
+ }
+
+ return OBJECT;
+ }
+
+ public Class getJavaType()
+ {
+ return javaType;
+ }
+ }
+
+ public static class Schema
+ {
+ public String name;
+ public String fqcn;
+ public List<SQLFieldInfo> fieldList = new ArrayList<>();
+ public Class beanClass;
+ public byte[] beanClassBytes;
+
+ public Schema addField(String fieldName, Class fieldType)
+ {
+ fieldList.add(new SQLFieldInfo(fieldName, Type.getFromJavaType(fieldType)));
+ return this;
+ }
+
+ public Schema generateBean() throws IOException, JSONException
+ {
+ // Generate
+ this.fqcn = FQCN_PACKAGE + name;
+
+ // Use Bean Class generator to generate the class
+ this.beanClassBytes = BeanClassGenerator.createAndWriteBeanClass(this.fqcn, fieldList);
+ this.beanClass = ClassLoaderUtils.readBeanClass(fqcn, beanClassBytes);
+
+ return this;
+ }
+ }
+
+ public static class SQLFieldInfo
+ {
+ String columnName;
+ Type type;
+
+ public SQLFieldInfo(String columnName, Type type)
+ {
+ this.columnName = columnName;
+ this.type = type;
+ }
+
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java
new file mode 100644
index 0000000..a96df65
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+import com.datatorrent.contrib.parser.DelimitedSchema;
+
+@InterfaceStability.Evolving
+public class CSVMessageFormat implements MessageFormat
+{
+ public static final String CSV_SCHEMA = "schema";
+ private Map<String, Object> operands;
+
+ public CSVMessageFormat()
+ {
+ }
+
+ public CSVMessageFormat(String schema)
+ {
+ this.operands = ImmutableMap.<String, Object>of(CSV_SCHEMA, schema);
+ }
+
+ @Override
+ public MessageFormatType getMessageFormatType()
+ {
+ return MessageFormatType.CSV;
+ }
+
+ @Override
+ public void setMessageFormatOperands(Map<String, Object> operands)
+ {
+ this.operands = operands;
+ }
+
+ @Override
+ public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ CsvParser csvParser = dag.addOperator(OperatorUtils.getUniqueOperatorName("CSVParser"), CsvParser.class);
+ csvParser.setSchema((String)operands.get(CSV_SCHEMA));
+
+ return new RelInfo("CSVParser", Lists.<Operator.InputPort>newArrayList(csvParser.in), csvParser, csvParser.out,
+ getRowType(typeFactory));
+ }
+
+ @Override
+ public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ CsvFormatter formatter = dag.addOperator(OperatorUtils.getUniqueOperatorName("CSVFormatter"), CsvFormatter.class);
+ formatter.setSchema((String)operands.get(CSV_SCHEMA));
+
+ return new RelInfo("CSVFormatter", Lists.<Operator.InputPort>newArrayList(formatter.in), formatter, formatter.out,
+ getRowType(typeFactory));
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory)
+ {
+ String schema = (String)operands.get(CSV_SCHEMA);
+ RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
+
+ DelimitedSchema delimitedSchema = new DelimitedSchema(schema);
+ for (DelimitedSchema.Field field : delimitedSchema.getFields()) {
+ builder.add(field.getName(), convertField(typeFactory, field.getType()));
+ }
+
+ return builder.build();
+ }
+
+ private RelDataType convertField(RelDataTypeFactory typeFactory, DelimitedSchema.FieldType type)
+ {
+ RelDataType relDataType;
+ switch (type) {
+ case BOOLEAN:
+ relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ break;
+ case DOUBLE:
+ relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+ break;
+ case INTEGER:
+ relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+ break;
+ case FLOAT:
+ relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+ break;
+ case LONG:
+ relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+ break;
+ case SHORT:
+ relDataType = typeFactory.createSqlType(SqlTypeName.SMALLINT);
+ break;
+ case CHARACTER:
+ relDataType = typeFactory.createSqlType(SqlTypeName.CHAR);
+ break;
+ case STRING:
+ relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+ break;
+ case DATE:
+ relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+ break;
+ default:
+ relDataType = typeFactory.createSqlType(SqlTypeName.ANY);
+ }
+
+ return relDataType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java
new file mode 100644
index 0000000..41a26de
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * This interface defines abstract table and how it should be operated with.
+ * Endpoint interface can be implemented for any type of data source eg. Kafka, File, JDBC etc.
+ * Implementation of Endpoint interface should define how the table should represented for both input OR output side.
+ */
+@InterfaceStability.Evolving
+public interface Endpoint
+{
+ String ENDPOINT = "endpoint";
+ String SYSTEM_OPERANDS = "endpointOperands";
+
+ /**
+ * Returns target type system
+ * @return Returns target type system
+ */
+ EndpointType getTargetType();
+
+ /**
+ * Set Endpoint operands. This method is used when the table definitions are provided using calcite schema format.
+ * This is the map which is present against key "endpointOperands" in calcite schema definition input file.
+ *
+ * @param operands Map of endpoint operands.
+ */
+ void setEndpointOperands(Map<String, Object> operands);
+
+ /**
+ * Message Format type which defines how the data should be interpreted for both input and output side.
+ *
+ * @param messageFormat Object of type MessageFormat
+ */
+ void setMessageFormat(MessageFormat messageFormat);
+
+ /**
+ * Implementation of this method should populate Apex DAG if this table is at input side of pipeline.
+ *
+ * @param dag {@link DAG} object to be populated
+ * @param typeFactory Java Type Factory
+ *
+ * @return Returns {@link RelInfo} describing output of this input phase.
+ */
+ RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+ /**
+ * Implementation of this method should populate Apex DAG if table is at output side of pipeline.
+ *
+ * @param dag {@link DAG} object to be populated
+ * @param typeFactory Java Type Factory
+ * @return Returns {@link RelInfo} describing expected input of this output phase.
+ */
+ RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+ /**
+ * This method returns what should be the input data type to output phase OR output data type of input phase.
+ *
+ * @param typeFactory Java Type Factory for data type conversions.
+ *
+ * @return {@link RelDataType} representing data type format.
+ */
+ RelDataType getRowType(RelDataTypeFactory typeFactory);
+
+ /**
+ * Type of Endpoints
+ */
+ enum EndpointType
+ {
+ FILE,
+ KAFKA,
+ PORT
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java
new file mode 100644
index 0000000..cac32a4
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator;
+import org.apache.apex.malhar.sql.operators.LineReader;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is an implementation of {@link Endpoint} which defined how data should be read/written to file system.
+ */
+@InterfaceStability.Evolving
+public class FileEndpoint implements Endpoint
+{
+ public static final String FILE_INPUT_DIRECTORY = "directory";
+ public static final String FILE_OUT_PATH = "outputFilePath";
+ public static final String FILE_OUT_NAME = "outputFileName";
+
+ private MessageFormat messageFormat;
+
+ private Map<String, Object> operands;
+
+ public FileEndpoint()
+ {
+ }
+
+ public FileEndpoint(String directory, MessageFormat messageFormat)
+ {
+ this.messageFormat = messageFormat;
+ this.operands = ImmutableMap.<String, Object>of(FILE_INPUT_DIRECTORY, directory);
+ }
+
+ public FileEndpoint(String directory, String fileName, MessageFormat messageFormat)
+ {
+ this.messageFormat = messageFormat;
+ this.operands = ImmutableMap.<String, Object>of(FILE_OUT_PATH, directory, FILE_OUT_NAME, fileName);
+ }
+
+ @Override
+ public EndpointType getTargetType()
+ {
+ return EndpointType.FILE;
+ }
+
+ @Override
+ public void setEndpointOperands(Map<String, Object> operands)
+ {
+ this.operands = operands;
+ }
+
+ @Override
+ public void setMessageFormat(MessageFormat messageFormat)
+ {
+ this.messageFormat = messageFormat;
+ }
+
+ @Override
+ public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ LineReader fileInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("FileInput"), LineReader.class);
+ fileInput.setDirectory((String)operands.get(FILE_INPUT_DIRECTORY));
+
+ RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory);
+ dag.addStream(OperatorUtils.getUniqueStreamName("File", "Parser"), fileInput.output, spec.getInputPorts().get(0));
+ return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(),
+ messageFormat.getRowType(typeFactory));
+ }
+
+ @Override
+ public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory);
+
+ GenericFileOutputOperator.StringFileOutputOperator fileOutput =
+ dag.addOperator(OperatorUtils.getUniqueOperatorName("FileOutput"),
+ GenericFileOutputOperator.StringFileOutputOperator.class);
+ fileOutput.setFilePath((String)operands.get(FILE_OUT_PATH));
+ fileOutput.setOutputFileName((String)operands.get(FILE_OUT_NAME));
+
+ dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "File"), spec.getOutPort(), fileOutput.input);
+
+ return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory));
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory)
+ {
+ return messageFormat.getRowType(typeFactory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java
new file mode 100644
index 0000000..56419c3
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is an implementation of {@link Endpoint} which defined how data should be read/written from kafka messaging system
+ */
+@InterfaceStability.Evolving
+public class KafkaEndpoint implements Endpoint
+{
+ public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+
+ public static final String KAFKA_SERVERS = "servers";
+ public static final String KAFKA_TOPICS = "topics";
+
+ private MessageFormat messageFormat;
+
+ private Map<String, Object> operands;
+
+ public KafkaEndpoint()
+ {
+ }
+
+ public KafkaEndpoint(String kafkaServers, String topics, MessageFormat messageFormat)
+ {
+ this.messageFormat = messageFormat;
+ this.operands = ImmutableMap.<String, Object>of(KAFKA_SERVERS, kafkaServers, KAFKA_TOPICS, topics);
+ }
+
+ @Override
+ public EndpointType getTargetType()
+ {
+ return EndpointType.KAFKA;
+ }
+
+ @Override
+ public void setEndpointOperands(Map<String, Object> operands)
+ {
+ this.operands = operands;
+ }
+
+ @Override
+ public void setMessageFormat(MessageFormat messageFormat)
+ {
+ this.messageFormat = messageFormat;
+ }
+
+ @Override
+ public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ KafkaSinglePortInputOperator kafkaInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaInput"),
+ KafkaSinglePortInputOperator.class);
+ kafkaInput.setTopics((String)operands.get(KAFKA_TOPICS));
+ kafkaInput.setInitialOffset("EARLIEST");
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS));
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+ kafkaInput.setConsumerProps(props);
+
+ kafkaInput.setClusters((String)operands.get(KAFKA_SERVERS));
+
+ RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory);
+ dag.addStream(OperatorUtils.getUniqueStreamName("Kafka", "Parser"), kafkaInput.outputPort,
+ spec.getInputPorts().get(0));
+ return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(),
+ messageFormat.getRowType(typeFactory));
+ }
+
+ @Override
+ public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory);
+
+ KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaOutput"),
+ KafkaSinglePortOutputOperator.class);
+ kafkaOutput.setTopic((String)operands.get(KAFKA_TOPICS));
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS));
+ kafkaOutput.setProperties(props);
+
+ dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "Kafka"), spec.getOutPort(), kafkaOutput.inputPort);
+
+ return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory));
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory)
+ {
+ return messageFormat.getRowType(typeFactory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java
new file mode 100644
index 0000000..80fef93
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * This interface defines how message should be parsed from input or formatted for output.
+ * The implementation of this interface should define both parsing and formatting representation for data.
+ */
+@InterfaceStability.Evolving
+public interface MessageFormat
+{
+ String MESSAGE_FORMAT = "messageFormat";
+ String MESSAGE_FORMAT_OPERANDS = "messageFormatOperands";
+
+ /**
+ * Gives type of {@link MessageFormat}
+ * @return Returns type of {@link MessageFormat}
+ */
+ MessageFormatType getMessageFormatType();
+
+ /**
+ * Set messageFormat operands. This method is used when the table definitions are provided using calcite schema format.
+ * This is the map which is present against key "endpointOperands" in calcite schema definition input file.
+ * @param operands
+ */
+ void setMessageFormatOperands(Map<String, Object> operands);
+
+ /**
+ * Implementation of this method should populate the DAG for parsing logic for the data received from {@link Endpoint}
+ *
+ * @param dag {@link DAG} object to be populated
+ * @param typeFactory Java Type Factory
+ * @return Returns {@link RelInfo} defining output data type definition after parsing of data.
+ */
+ RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+ /**
+ * Implementation of this method should populate the DAG for formatting logic of data to be written to {@link Endpoint}
+ *
+ * @param dag {@link DAG} object to be populated
+ * @param typeFactory Java Type Factory
+ * @return Returns {@link RelInfo} defining expected input for formatting ot data.
+ */
+ RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+ /**
+ * This method returns what should be the input data type to output phase OR output data type of input phase.
+ *
+ * @param typeFactory Java Type Factory for data type conversions.
+ *
+ * @return {@link RelDataType} representing data type format.
+ */
+ RelDataType getRowType(RelDataTypeFactory typeFactory);
+
+ /**
+ * Message Format types
+ */
+ enum MessageFormatType
+ {
+ CSV
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java
new file mode 100644
index 0000000..5462e42
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.lang.reflect.Field;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.planner.RelInfo;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is an implementation of {@link Endpoint} which defined how data should be read/written to a Apex streaming port.
+ */
+@InterfaceStability.Evolving
+public class StreamEndpoint implements Endpoint
+{
+ private Operator.InputPort inputPort;
+ private Operator.OutputPort outputPort;
+ private Class pojoClass;
+ private Map<String, Class> fieldMapping;
+
+ public StreamEndpoint(Operator.InputPort port, Class pojoClass)
+ {
+ this.inputPort = port;
+ this.pojoClass = pojoClass;
+ }
+
+ public StreamEndpoint(Operator.InputPort port, Map<String, Class> fieldMapping)
+ {
+ this.inputPort = port;
+ this.fieldMapping = fieldMapping;
+ }
+
+ public StreamEndpoint(Operator.OutputPort outputPort, Class pojoClass)
+ {
+ this.outputPort = outputPort;
+ this.pojoClass = pojoClass;
+ }
+
+ public StreamEndpoint(Operator.OutputPort port, Map<String, Class> fieldMapping)
+ {
+ this.outputPort = port;
+ this.fieldMapping = fieldMapping;
+ }
+
+ @Override
+ public EndpointType getTargetType()
+ {
+ return EndpointType.PORT;
+ }
+
+ @Override
+ public void setEndpointOperands(Map<String, Object> operands)
+ {
+ }
+
+ @Override
+ public void setMessageFormat(MessageFormat messageFormat)
+ {
+ }
+
+ @Override
+ public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ return new RelInfo("StreamInput", Lists.<Operator.InputPort>newArrayList(), null, outputPort, getRowType(typeFactory));
+ }
+
+ @Override
+ public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+ {
+ return new RelInfo("StreamOutput", Lists.newArrayList(inputPort), null, null, getRowType(typeFactory));
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory)
+ {
+ RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
+ if (fieldMapping != null) {
+ for (Map.Entry<String, Class> entry : fieldMapping.entrySet()) {
+ builder.add(entry.getKey(), convertField(typeFactory, entry.getValue()));
+ }
+ } else if (pojoClass != null) {
+ for (Field field : pojoClass.getDeclaredFields()) {
+ builder.add(field.getName(), convertField(typeFactory, field.getType()));
+ }
+ } else {
+ throw new RuntimeException("Either fieldMapping or pojoClass needs to be set.");
+ }
+
+ return builder.build();
+ }
+
+ private RelDataType convertField(RelDataTypeFactory typeFactory, Class<?> type)
+ {
+ RelDataType relDataType;
+
+ if ((type == Boolean.class) || (type == boolean.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ } else if ((type == Double.class) || (type == double.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+ } else if ((type == Integer.class) || (type == int.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+ } else if ((type == Float.class) || (type == float.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+ } else if ((type == Long.class) || (type == long.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+ } else if ((type == Short.class) || (type == short.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.SMALLINT);
+ } else if ((type == Character.class) || (type == char.class) || (type == Byte.class) || (type == byte.class)) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.CHAR);
+ } else if (type == String.class) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+ } else if (type == Date.class) {
+ relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+ } else {
+ relDataType = typeFactory.createSqlType(SqlTypeName.ANY);
+ }
+ return relDataType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java
new file mode 100644
index 0000000..900fd10
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.TimeZone;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class FileEndpointTest
+{
+ private TimeZone defaultTZ;
+ private static String outputFolder = "target/output/";
+
+ @Rule
+ public TestName testName = new TestName();
+
+ public static String apex_concat_str(String s1, String s2)
+ {
+ return s1 + s2;
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+ outputFolder += testName.getMethodName() + "/";
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ File modelFile = new File("src/test/resources/model/model_file_csv.json");
+ String model = FileUtils.readFileToString(modelFile);
+
+ PrintStream originalSysout = System.out;
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(baos));
+
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new Application(model), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ waitTillStdoutIsPopulated(baos, 30000);
+
+ lc.shutdown();
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ } catch (Exception e) {
+ Assert.fail("Exception: " + e);
+ }
+
+ System.setOut(originalSysout);
+
+ String[] sout = baos.toString().split(System.lineSeparator());
+ Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+ String[] actualLines = filter.toArray(new String[filter.size()]);
+ Assert.assertEquals(6, actualLines.length);
+ Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+ Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+ Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+ Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+ Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+ Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+ }
+
+ private boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
+ IOException
+ {
+ long now = System.currentTimeMillis();
+ Collection<String> filter = Lists.newArrayList();
+ while (System.currentTimeMillis() - now < timeout) {
+ baos.flush();
+ String[] sout = baos.toString().split(System.lineSeparator());
+ filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+ if (filter.size() != 0) {
+ break;
+ }
+
+ Thread.sleep(500);
+ }
+
+ return (filter.size() != 0);
+ }
+
+ @Test
+ public void testApplicationSelectInsertWithAPI() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new ApplicationSelectInsertWithAPI(), conf);
+
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ /**
+ * Wait time is 40 sec to ensure that checkpoint happens. AbstractFileOutputOperators flushes the stream
+ * in beforeCheckpoint call.
+ */
+ Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
+ lc.shutdown();
+ } catch (Exception e) {
+ Assert.fail("constraint violations: " + e);
+ }
+
+ File file = new File(outputFolder);
+ File file1 = new File(outputFolder + file.list()[0]);
+ List<String> strings = FileUtils.readLines(file1);
+
+ String[] actualLines = strings.toArray(new String[strings.size()]);
+
+ String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4", "",
+ "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5", ""};
+ Assert.assertTrue(Arrays.deepEquals(actualLines, expectedLines));
+ }
+
+ private boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException
+ {
+ boolean result;
+ long now = System.currentTimeMillis();
+ Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath());
+ try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) {
+ List<String> strings = Lists.newArrayList();
+ while (System.currentTimeMillis() - now < timeout) {
+ if (fs.exists(outDir)) {
+ File file = new File(outputFolder);
+ if (file.list().length > 0) {
+ File file1 = new File(outputFolder + file.list()[0]);
+ strings = FileUtils.readLines(file1);
+ if (strings.size() != 0) {
+ break;
+ }
+ }
+ }
+
+ Thread.sleep(500);
+ }
+
+ result = fs.exists(outDir) && (strings.size() != 0);
+ }
+
+ return result;
+ }
+
+
+ public static class Application implements StreamingApplication
+ {
+ String model;
+
+ public Application(String model)
+ {
+ this.model = model;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SQLExecEnvironment.getEnvironment()
+ .withModel(model)
+ .executeSQL(dag, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS");
+ }
+ }
+
+ public static class ApplicationSelectInsertWithAPI implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"id\",\"type\":\"Integer\"}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}," +
+ "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+ String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+ SQLExecEnvironment.getEnvironment()
+ .registerTable("ORDERS", new FileEndpoint("src/test/resources/input.csv",
+ new CSVMessageFormat(schemaIn)))
+ .registerTable("SALES", new FileEndpoint(outputFolder, "out.tmp", new CSVMessageFormat(schemaOut)))
+ .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+ .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " +
+ "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " +
+ "PRODUCT LIKE 'paint%'");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java b/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java
new file mode 100644
index 0000000..7162e31
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.util.Date;
+
+public class InputPOJO
+{
+ private Date RowTime;
+ private int id;
+ private String Product;
+ private int units;
+
+ public Date getRowTime()
+ {
+ return RowTime;
+ }
+
+ public void setRowTime(Date rowTime)
+ {
+ RowTime = rowTime;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public String getProduct()
+ {
+ return Product;
+ }
+
+ public void setProduct(String product)
+ {
+ Product = product;
+ }
+
+ public int getUnits()
+ {
+ return units;
+ }
+
+ public void setUnits(int units)
+ {
+ this.units = units;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java
new file mode 100644
index 0000000..14eff70
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+
+public class KafkaEndpointTest
+{
+ private final String testTopicData0 = "dataTopic0";
+ private final String testTopicData1 = "dataTopic1";
+ private final String testTopicResult = "resultTopic";
+
+ private EmbeddedKafka kafka;
+
+ private TimeZone defaultTZ;
+
+ @Before
+ public void setup() throws IOException
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+ kafka = new EmbeddedKafka();
+ kafka.start();
+ kafka.createTopic(testTopicData0);
+ kafka.createTopic(testTopicData1);
+ kafka.createTopic(testTopicResult);
+ }
+
+ @After
+ public void tearDown() throws IOException
+ {
+ kafka.stop();
+
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void testApplicationSelectInsertWithAPI() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new KafkaApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+ "15/02/2016 10:16:00 +0000,2,paint2,12",
+ "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+ "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+ // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+ String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n",
+ "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"};
+
+ List<String> consume = kafka.consume(testTopicResult, 30000);
+ Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+ lc.shutdown();
+ } catch (Exception e) {
+ Assert.fail("constraint violations: " + e);
+ }
+ }
+
+ @Test
+ public void testApplicationWithPortEndpoint() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new KafkaPortApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+ "15/02/2016 10:16:00 +0000,2,paint2,12",
+ "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+ "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+ // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+ String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n",
+ "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"};
+
+ List<String> consume = kafka.consume(testTopicResult, 30000);
+
+ Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+ lc.shutdown();
+ } catch (Exception e) {
+ Assert.fail("constraint violations: " + e);
+ }
+ }
+
+ @Test
+ public void testApplicationJoin() throws Exception
+ {
+ String sql = "INSERT INTO SALES " +
+ "SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " +
+ "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " +
+ "FROM ORDERS AS A " +
+ "JOIN CATEGORY AS B ON A.id = B.id " +
+ "WHERE A.id > 3 AND A.PRODUCT LIKE 'paint%'";
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new KafkaJoinApplication(kafka.getBroker(), testTopicData0, testTopicData1,
+ testTopicResult, sql), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+ "15/02/2016 10:16:00 +0000,2,paint2,12",
+ "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+ "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+ kafka.publish(testTopicData1, Arrays.asList("1,ABC",
+ "2,DEF",
+ "3,GHI", "4,JKL",
+ "5,MNO", "6,PQR"));
+
+ // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+ String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4,JKL\r\n",
+ "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5,MNO\r\n"};
+
+ List<String> consume = kafka.consume(testTopicResult, 30000);
+
+ Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+ lc.shutdown();
+ } catch (Exception e) {
+ Assert.fail("constraint violations: " + e);
+ }
+ }
+
+ @Test
+ public void testApplicationJoinFilter() throws Exception
+ {
+ String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " +
+ "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " +
+ "FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id AND A.id > 3" +
+ "WHERE A.PRODUCT LIKE 'paint%'";
+
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new KafkaJoinApplication(kafka.getBroker(), testTopicData0, testTopicData1,
+ testTopicResult, sql), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+ "15/02/2016 10:16:00 +0000,2,paint2,12",
+ "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+ "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+ kafka.publish(testTopicData1, Arrays.asList("1,ABC",
+ "2,DEF",
+ "3,GHI", "4,JKL",
+ "5,MNO", "6,PQR"));
+
+ // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+ String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4,JKL\r\n",
+ "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5,MNO\r\n"};
+
+ List<String> consume = kafka.consume(testTopicResult, 30000);
+
+ Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+ lc.shutdown();
+ } catch (Exception e) {
+ Assert.fail("constraint violations: " + e);
+ }
+ }
+
+ public static class KafkaApplication implements StreamingApplication
+ {
+ private String broker;
+ private String sourceTopic;
+ private String destTopic;
+
+ public KafkaApplication(String broker, String sourceTopic, String destTopic)
+ {
+ this.broker = broker;
+ this.sourceTopic = sourceTopic;
+ this.destTopic = destTopic;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"id\",\"type\":\"Integer\"}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}," +
+ "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+ String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+ SQLExecEnvironment.getEnvironment()
+ .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic, new CSVMessageFormat(schemaIn)))
+ .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut)))
+ .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+ .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " +
+ "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " +
+ "PRODUCT LIKE 'paint%'");
+ }
+ }
+
+ public static class KafkaJoinApplication implements StreamingApplication
+ {
+ private String broker;
+ private String sourceTopic0;
+ private String sourceTopic1;
+ private String destTopic;
+ private String sql;
+
+ public KafkaJoinApplication(String broker, String sourceTopic0, String sourceTopic1, String destTopic, String sql)
+ {
+ this.broker = broker;
+ this.sourceTopic0 = sourceTopic0;
+ this.sourceTopic1 = sourceTopic1;
+ this.destTopic = destTopic;
+ this.sql = sql;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"id\",\"type\":\"Integer\"}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}," +
+ "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+ String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"id\",\"type\":\"Integer\"}," +
+ "{\"name\":\"Category\",\"type\":\"String\"}]}";
+ String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}," +
+ "{\"name\":\"Category\",\"type\":\"String\"}]}";
+
+ SQLExecEnvironment.getEnvironment()
+ .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic0, new CSVMessageFormat(schemaIn0)))
+ .registerTable("CATEGORY", new KafkaEndpoint(broker, sourceTopic1, new CSVMessageFormat(schemaIn1)))
+ .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut)))
+ .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+ .executeSQL(dag, sql);
+ }
+ }
+
+ public static class KafkaPortApplication implements StreamingApplication
+ {
+ private String broker;
+ private String sourceTopic;
+ private String destTopic;
+
+ public KafkaPortApplication(String broker, String sourceTopic, String destTopic)
+ {
+ this.broker = broker;
+ this.sourceTopic = sourceTopic;
+ this.destTopic = destTopic;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"id\",\"type\":\"Integer\"}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}," +
+ "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+ String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+ "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+ "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+ KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+ kafkaInput.setTopics(sourceTopic);
+ kafkaInput.setInitialOffset("EARLIEST");
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER);
+ kafkaInput.setConsumerProps(props);
+ kafkaInput.setClusters(broker);
+
+ CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+ csvParser.setSchema(schemaIn);
+
+ dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+ CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class);
+ formatter.setSchema(schemaOut);
+
+ KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class);
+ kafkaOutput.setTopic(destTopic);
+
+ props = new Properties();
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ kafkaOutput.setProperties(props);
+
+ dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort);
+
+ SQLExecEnvironment.getEnvironment()
+ .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class))
+ .registerTable("SALES", new StreamEndpoint(formatter.in, OutputPOJO.class))
+ .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+ .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " +
+ "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " +
+ "PRODUCT LIKE 'paint%'");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java b/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java
new file mode 100644
index 0000000..fdf78d7
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+
+import java.util.Date;
+
+public class OutputPOJO
+{
+ private Date RowTime1;
+ private Date RowTime2;
+ private String Product;
+
+ public Date getRowTime1()
+ {
+ return RowTime1;
+ }
+
+ public void setRowTime1(Date rowTime1)
+ {
+ RowTime1 = rowTime1;
+ }
+
+ public Date getRowTime2()
+ {
+ return RowTime2;
+ }
+
+ public void setRowTime2(Date rowTime2)
+ {
+ RowTime2 = rowTime2;
+ }
+
+ public String getProduct()
+ {
+ return Product;
+ }
+
+ public void setProduct(String product)
+ {
+ Product = product;
+ }
+}