You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/21 16:58:27 UTC

[2/4] apex-malhar git commit: APEXMALHAR-1818 SQL Support for converting given SQL statement to APEX DAG.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java
new file mode 100644
index 0000000..68343ce
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This class is the main class that converts relational algebra to a sub-DAG.
+ */
+@InterfaceStability.Evolving
+public class RelNodeVisitor
+{
+  private final DAG dag;
+  private final TupleSchemaRegistry tupleSchemaRegistry;
+  private final JavaTypeFactory typeFactory;
+
+  public RelNodeVisitor(DAG dag, JavaTypeFactory typeFactory)
+  {
+    this.dag = dag;
+    this.typeFactory = typeFactory;
+    this.tupleSchemaRegistry = new TupleSchemaRegistry();
+  }
+
+  /**
+   * This is the main method in this relational node visitor which traverses the relational algebra in reverse direction
+   * and populate the given underlying DAG object.
+   *
+   * @param relNode RelNode which needs to be traversed.
+   *
+   * @return RelInfo representing information of current stage
+   * @throws Exception
+   */
+  public final RelInfo traverse(RelNode relNode) throws Exception
+  {
+    List<RelInfo> inputStreams = new ArrayList<>();
+    for (RelNode input : relNode.getInputs()) {
+      inputStreams.add(traverse(input));
+    }
+
+    ApexRelNode.RelContext relContext = new ApexRelNode.RelContext(dag, typeFactory, tupleSchemaRegistry);
+
+    RelInfo currentNodeRelInfo;
+    ApexRelNode apexRelNode = ApexRelNode.relNodeMapping.get(relNode.getClass());
+    if (apexRelNode == null) {
+      throw new UnsupportedOperationException("RelNode " + relNode.getRelTypeName() + " is not supported.");
+    }
+    currentNodeRelInfo = apexRelNode.visit(relContext, relNode, inputStreams);
+
+    if (currentNodeRelInfo != null && inputStreams.size() != 0) {
+      for (int i = 0; i < inputStreams.size(); i++) {
+        RelInfo inputStream = inputStreams.get(i);
+        Operator.OutputPort outputPort = inputStream.getOutPort();
+        Operator.InputPort inputPort = currentNodeRelInfo.getInputPorts().get(i);
+
+        String streamName = OperatorUtils.getUniqueStreamName(inputStream.getRelName(),
+            currentNodeRelInfo.getRelName());
+        Class schema;
+        if (inputStream.getOutRelDataType() != null) {
+          schema = TupleSchemaRegistry.getSchemaForRelDataType(tupleSchemaRegistry, streamName,
+              inputStream.getOutRelDataType());
+        } else if (inputStream.getClazz() != null) {
+          schema = inputStream.getClazz();
+        } else {
+          throw new RuntimeException("Unexpected condition reached.");
+        }
+        dag.setOutputPortAttribute(outputPort, Context.PortContext.TUPLE_CLASS, schema);
+        dag.setInputPortAttribute(inputPort, Context.PortContext.TUPLE_CLASS, schema);
+        dag.addStream(streamName, outputPort, inputPort);
+      }
+    }
+
+    if (currentNodeRelInfo.getOutPort() == null) {
+      // End of the pipeline.
+      String schemaJar = tupleSchemaRegistry.generateCommonJar();
+
+      String jars = dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
+      dag.setAttribute(Context.DAGContext.LIBRARY_JARS,
+          ((jars != null) && (jars.length() != 0)) ? jars + "," + schemaJar : schemaJar);
+    }
+
+    return currentNodeRelInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java
new file mode 100644
index 0000000..6d16f63
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.schema;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.table.Endpoint;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This is representation of Apex source/destination to Calcite's {@link StreamableTable} table.
+ * Any table that gets registered with {@link org.apache.apex.malhar.sql.SQLExecEnvironment}
+ * gets registered as {@link ApexSQLTable}.
+ */
+@InterfaceStability.Evolving
+public class ApexSQLTable implements ScannableTable, StreamableTable
+{
+  private SchemaPlus schema;
+  private String name;
+  private Map<String, Object> operands;
+  private RelDataType rowType;
+  private Endpoint endpoint;
+
+  public ApexSQLTable(SchemaPlus schemaPlus, String name, Map<String, Object> operands, RelDataType rowType,
+      Endpoint endpoint)
+  {
+    this.schema = schemaPlus;
+    this.name = name;
+    this.operands = operands;
+    this.rowType = rowType;
+    this.endpoint = endpoint;
+  }
+
+  public ApexSQLTable(SchemaPlus schema, String name, Endpoint endpoint)
+  {
+    this(schema, name, null, null, endpoint);
+  }
+
+  @Override
+  public Enumerable<Object[]> scan(DataContext dataContext)
+  {
+    return null;
+  }
+
+  @Override
+  public Table stream()
+  {
+    return new ApexSQLTable(schema, name, operands, rowType, endpoint);
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory)
+  {
+    if (rowType == null) {
+      rowType = endpoint.getRowType(relDataTypeFactory);
+    }
+    return rowType;
+  }
+
+  @Override
+  public Statistic getStatistic()
+  {
+    return Statistics.of(100d, ImmutableList.<ImmutableBitSet>of(), RelCollations.createSingleton(0));
+  }
+
+  @Override
+  public Schema.TableType getJdbcTableType()
+  {
+    return Schema.TableType.STREAM;
+  }
+
+  public SchemaPlus getSchema()
+  {
+    return schema;
+  }
+
+  public String getName()
+  {
+    return name;
+  }
+
+  public Map<String, Object> getOperands()
+  {
+    return operands;
+  }
+
+  public RelDataType getRowType()
+  {
+    return rowType;
+  }
+
+  public Endpoint getEndpoint()
+  {
+    return endpoint;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java
new file mode 100644
index 0000000..c18f854
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.schema;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.Endpoint;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.apex.malhar.sql.table.MessageFormat;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TableFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ApexSQLTableFactory implements TableFactory<Table>
+{
+  @SuppressWarnings("unchecked")
+  @Override
+  public Table create(SchemaPlus schemaPlus, String name, Map<String, Object> operands, RelDataType rowType)
+  {
+    Endpoint endpoint;
+    String endpointSystemType = (String)operands.get(Endpoint.ENDPOINT);
+
+    if (endpointSystemType.equalsIgnoreCase(Endpoint.EndpointType.FILE.name())) {
+      endpoint = new FileEndpoint();
+    } else if (endpointSystemType.equalsIgnoreCase(Endpoint.EndpointType.KAFKA.name())) {
+      endpoint = new KafkaEndpoint();
+    } else {
+      throw new RuntimeException("Cannot find endpoint");
+    }
+    endpoint.setEndpointOperands((Map<String, Object>)operands.get(Endpoint.SYSTEM_OPERANDS));
+
+    MessageFormat mf;
+    String messageFormat = (String)operands.get(MessageFormat.MESSAGE_FORMAT);
+    if (messageFormat.equalsIgnoreCase(MessageFormat.MessageFormatType.CSV.name())) {
+      mf = new CSVMessageFormat();
+    } else {
+      throw new RuntimeException("Cannot find message format");
+    }
+    mf.setMessageFormatOperands((Map<String, Object>)operands.get(MessageFormat.MESSAGE_FORMAT_OPERANDS));
+
+    endpoint.setMessageFormat(mf);
+
+    return new ApexSQLTable(schemaPlus, name, operands, rowType, endpoint);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
new file mode 100644
index 0000000..7924298
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.schema;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Time;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.codehaus.jettison.json.JSONException;
+
+import org.apache.apex.malhar.lib.utils.ClassLoaderUtils;
+import org.apache.apex.malhar.sql.codegen.BeanClassGenerator;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@InterfaceStability.Evolving
+public class TupleSchemaRegistry
+{
+  public static final String FQCN_PACKAGE = "org.apache.apex.generated.schema.";
+  private Map<String, Schema> schemas = new HashMap<>();
+
+  public Schema createNewSchema(String name)
+  {
+    if (schemas.containsKey(name)) {
+      return schemas.get(name);
+    }
+
+    Schema schema = new Schema();
+    schema.name = name;
+    schemas.put(name, schema);
+
+    return schema;
+  }
+
+  public Schema getSchemaDefinition(String name)
+  {
+    return schemas.get(name);
+  }
+
+  public String generateCommonJar() throws IOException
+  {
+    File file = File.createTempFile("schemaSQL", ".jar");
+
+    FileSystem fs = FileSystem.newInstance(file.toURI(), new Configuration());
+    FSDataOutputStream out = fs.create(new Path(file.getAbsolutePath()));
+    JarOutputStream jout = new JarOutputStream(out);
+
+    for (Schema schema : schemas.values()) {
+      jout.putNextEntry(new ZipEntry(schema.fqcn.replace(".", "/") + ".class"));
+      jout.write(schema.beanClassBytes);
+      jout.closeEntry();
+    }
+
+    jout.close();
+    out.close();
+
+    return file.getAbsolutePath();
+  }
+
+  public static Class getSchemaForRelDataType(TupleSchemaRegistry registry, String schemaName, RelDataType rowType)
+  {
+    if (rowType.isStruct()) {
+      TupleSchemaRegistry.Schema newSchema = registry.createNewSchema(schemaName);
+      for (RelDataTypeField field : rowType.getFieldList()) {
+        RelDataType type = field.getType();
+        newSchema.addField(OperatorUtils.getValidFieldName(field), convertPrimitiveToSqlType(type));
+      }
+      try {
+        newSchema.generateBean();
+      } catch (IOException | JSONException e) {
+        throw new RuntimeException("Failed to generate schema", e);
+      }
+      return newSchema.beanClass;
+    } else {
+      throw new UnsupportedOperationException("Non-struct row type is not implemented.");
+    }
+  }
+
+  private static Class convertPrimitiveToSqlType(RelDataType type)
+  {
+    /* I hope that following this method instead of calling value.value() is better
+    because we can catch any type mismatches. */
+    switch (type.getSqlTypeName()) {
+      case BOOLEAN:
+        return Boolean.class;
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+        return Integer.class;
+      case BIGINT:
+        return Long.class;
+      case REAL:
+        return Float.class;
+      case FLOAT:
+      case DOUBLE:
+        return Double.class;
+      case DATE:
+        return Date.class;
+      case TIME:
+        return Date.class;
+      case TIMESTAMP:
+        return Date.class;
+      case CHAR:
+      case VARCHAR:
+        return String.class;
+      case BINARY:
+      case VARBINARY:
+        return Byte.class;
+      case ANY:
+      case SYMBOL:
+        return Object.class;
+      default:
+        throw new RuntimeException(String.format("Unsupported type %s", type.getSqlTypeName()));
+    }
+  }
+
+  public enum Type
+  {
+    BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class),
+    FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class),
+    DATE(Date.class), TIME(Time.class);
+
+    private Class javaType;
+
+    Type(Class javaType)
+    {
+      this.javaType = javaType;
+    }
+
+    public static Type getFromJavaType(Class type)
+    {
+      for (Type supportType : Type.values()) {
+        if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) {
+          return supportType;
+        }
+      }
+
+      return OBJECT;
+    }
+
+    public Class getJavaType()
+    {
+      return javaType;
+    }
+  }
+
+  public static class Schema
+  {
+    public String name;
+    public String fqcn;
+    public List<SQLFieldInfo> fieldList = new ArrayList<>();
+    public Class beanClass;
+    public byte[] beanClassBytes;
+
+    public Schema addField(String fieldName, Class fieldType)
+    {
+      fieldList.add(new SQLFieldInfo(fieldName, Type.getFromJavaType(fieldType)));
+      return this;
+    }
+
+    public Schema generateBean() throws IOException, JSONException
+    {
+      // Generate
+      this.fqcn = FQCN_PACKAGE + name;
+
+      // Use Bean Class generator to generate the class
+      this.beanClassBytes = BeanClassGenerator.createAndWriteBeanClass(this.fqcn, fieldList);
+      this.beanClass = ClassLoaderUtils.readBeanClass(fqcn, beanClassBytes);
+
+      return this;
+    }
+  }
+
+  public static class SQLFieldInfo
+  {
+    String columnName;
+    Type type;
+
+    public SQLFieldInfo(String columnName, Type type)
+    {
+      this.columnName = columnName;
+      this.type = type;
+    }
+
+    public String getColumnName()
+    {
+      return columnName;
+    }
+
+    public Type getType()
+    {
+      return type;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java
new file mode 100644
index 0000000..a96df65
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+import com.datatorrent.contrib.parser.DelimitedSchema;
+
+@InterfaceStability.Evolving
+public class CSVMessageFormat implements MessageFormat
+{
+  public static final String CSV_SCHEMA = "schema";
+  private Map<String, Object> operands;
+
+  public CSVMessageFormat()
+  {
+  }
+
+  public CSVMessageFormat(String schema)
+  {
+    this.operands = ImmutableMap.<String, Object>of(CSV_SCHEMA, schema);
+  }
+
+  @Override
+  public MessageFormatType getMessageFormatType()
+  {
+    return MessageFormatType.CSV;
+  }
+
+  @Override
+  public void setMessageFormatOperands(Map<String, Object> operands)
+  {
+    this.operands = operands;
+  }
+
+  @Override
+  public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    CsvParser csvParser = dag.addOperator(OperatorUtils.getUniqueOperatorName("CSVParser"), CsvParser.class);
+    csvParser.setSchema((String)operands.get(CSV_SCHEMA));
+
+    return new RelInfo("CSVParser", Lists.<Operator.InputPort>newArrayList(csvParser.in), csvParser, csvParser.out,
+      getRowType(typeFactory));
+  }
+
+  @Override
+  public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    CsvFormatter formatter = dag.addOperator(OperatorUtils.getUniqueOperatorName("CSVFormatter"), CsvFormatter.class);
+    formatter.setSchema((String)operands.get(CSV_SCHEMA));
+
+    return new RelInfo("CSVFormatter", Lists.<Operator.InputPort>newArrayList(formatter.in), formatter, formatter.out,
+      getRowType(typeFactory));
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory)
+  {
+    String schema = (String)operands.get(CSV_SCHEMA);
+    RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
+
+    DelimitedSchema delimitedSchema = new DelimitedSchema(schema);
+    for (DelimitedSchema.Field field : delimitedSchema.getFields()) {
+      builder.add(field.getName(), convertField(typeFactory, field.getType()));
+    }
+
+    return builder.build();
+  }
+
+  private RelDataType convertField(RelDataTypeFactory typeFactory, DelimitedSchema.FieldType type)
+  {
+    RelDataType relDataType;
+    switch (type) {
+      case BOOLEAN:
+        relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+        break;
+      case DOUBLE:
+        relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+        break;
+      case INTEGER:
+        relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+        break;
+      case FLOAT:
+        relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+        break;
+      case LONG:
+        relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+        break;
+      case SHORT:
+        relDataType = typeFactory.createSqlType(SqlTypeName.SMALLINT);
+        break;
+      case CHARACTER:
+        relDataType = typeFactory.createSqlType(SqlTypeName.CHAR);
+        break;
+      case STRING:
+        relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+        break;
+      case DATE:
+        relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+        break;
+      default:
+        relDataType = typeFactory.createSqlType(SqlTypeName.ANY);
+    }
+
+    return relDataType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java
new file mode 100644
index 0000000..41a26de
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * This interface defines abstract table and how it should be operated with.
+ * Endpoint interface can be implemented for any type of data source eg. Kafka, File, JDBC etc.
+ * Implementation of Endpoint interface should define how the table should represented for both input OR output side.
+ */
+@InterfaceStability.Evolving
+public interface Endpoint
+{
+  String ENDPOINT = "endpoint";
+  String SYSTEM_OPERANDS = "endpointOperands";
+
+  /**
+   * Returns target type system
+   * @return Returns target type system
+   */
+  EndpointType getTargetType();
+
+  /**
+   * Set Endpoint operands. This method is used when the table definitions are provided using calcite schema format.
+   * This is the map which is present against key "endpointOperands" in calcite schema definition input file.
+   *
+   * @param operands Map of endpoint operands.
+   */
+  void setEndpointOperands(Map<String, Object> operands);
+
+  /**
+   * Message Format type which defines how the data should be interpreted for both input and output side.
+   *
+   * @param messageFormat Object of type MessageFormat
+   */
+  void setMessageFormat(MessageFormat messageFormat);
+
+  /**
+   * Implementation of this method should populate Apex DAG if this table is at input side of pipeline.
+   *
+   * @param dag {@link DAG} object to be populated
+   * @param typeFactory Java Type Factory
+   *
+   * @return Returns {@link RelInfo} describing output of this input phase.
+   */
+  RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+  /**
+   * Implementation of this method should populate Apex DAG if table is at output side of pipeline.
+   *
+   * @param dag {@link DAG} object to be populated
+   * @param typeFactory Java Type Factory
+   * @return Returns {@link RelInfo} describing expected input of this output phase.
+   */
+  RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+  /**
+   * This method returns what should be the input data type to output phase OR output data type of input phase.
+   *
+   * @param typeFactory Java Type Factory for data type conversions.
+   *
+   * @return {@link RelDataType} representing data type format.
+   */
+  RelDataType getRowType(RelDataTypeFactory typeFactory);
+
+  /**
+   * Type of Endpoints
+   */
+  enum EndpointType
+  {
+    FILE,
+    KAFKA,
+    PORT
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java
new file mode 100644
index 0000000..cac32a4
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator;
+import org.apache.apex.malhar.sql.operators.LineReader;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is an implementation of {@link Endpoint} which defined how data should be read/written to file system.
+ */
+@InterfaceStability.Evolving
+public class FileEndpoint implements Endpoint
+{
+  public static final String FILE_INPUT_DIRECTORY = "directory";
+  public static final String FILE_OUT_PATH = "outputFilePath";
+  public static final String FILE_OUT_NAME = "outputFileName";
+
+  private MessageFormat messageFormat;
+
+  private Map<String, Object> operands;
+
+  public FileEndpoint()
+  {
+  }
+
+  public FileEndpoint(String directory, MessageFormat messageFormat)
+  {
+    this.messageFormat = messageFormat;
+    this.operands = ImmutableMap.<String, Object>of(FILE_INPUT_DIRECTORY, directory);
+  }
+
+  public FileEndpoint(String directory, String fileName, MessageFormat messageFormat)
+  {
+    this.messageFormat = messageFormat;
+    this.operands = ImmutableMap.<String, Object>of(FILE_OUT_PATH, directory, FILE_OUT_NAME, fileName);
+  }
+
+  @Override
+  public EndpointType getTargetType()
+  {
+    return EndpointType.FILE;
+  }
+
+  @Override
+  public void setEndpointOperands(Map<String, Object> operands)
+  {
+    this.operands = operands;
+  }
+
+  @Override
+  public void setMessageFormat(MessageFormat messageFormat)
+  {
+    this.messageFormat = messageFormat;
+  }
+
+  @Override
+  public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    LineReader fileInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("FileInput"), LineReader.class);
+    fileInput.setDirectory((String)operands.get(FILE_INPUT_DIRECTORY));
+
+    RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory);
+    dag.addStream(OperatorUtils.getUniqueStreamName("File", "Parser"), fileInput.output, spec.getInputPorts().get(0));
+    return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(),
+      messageFormat.getRowType(typeFactory));
+  }
+
+  @Override
+  public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory);
+
+    GenericFileOutputOperator.StringFileOutputOperator fileOutput =
+        dag.addOperator(OperatorUtils.getUniqueOperatorName("FileOutput"),
+        GenericFileOutputOperator.StringFileOutputOperator.class);
+    fileOutput.setFilePath((String)operands.get(FILE_OUT_PATH));
+    fileOutput.setOutputFileName((String)operands.get(FILE_OUT_NAME));
+
+    dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "File"), spec.getOutPort(), fileOutput.input);
+
+    return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory));
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory)
+  {
+    return messageFormat.getRowType(typeFactory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java
new file mode 100644
index 0000000..56419c3
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.sql.operators.OperatorUtils;
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is an implementation of {@link Endpoint} which defined how data should be read/written from kafka messaging system
+ */
+@InterfaceStability.Evolving
+public class KafkaEndpoint implements Endpoint
+{
+  public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+  public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+
+  public static final String KAFKA_SERVERS = "servers";
+  public static final String KAFKA_TOPICS = "topics";
+
+  private MessageFormat messageFormat;
+
+  private Map<String, Object> operands;
+
+  public KafkaEndpoint()
+  {
+  }
+
+  public KafkaEndpoint(String kafkaServers, String topics, MessageFormat messageFormat)
+  {
+    this.messageFormat = messageFormat;
+    this.operands = ImmutableMap.<String, Object>of(KAFKA_SERVERS, kafkaServers, KAFKA_TOPICS, topics);
+  }
+
+  @Override
+  public EndpointType getTargetType()
+  {
+    return EndpointType.KAFKA;
+  }
+
+  @Override
+  public void setEndpointOperands(Map<String, Object> operands)
+  {
+    this.operands = operands;
+  }
+
+  @Override
+  public void setMessageFormat(MessageFormat messageFormat)
+  {
+    this.messageFormat = messageFormat;
+  }
+
+  @Override
+  public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    KafkaSinglePortInputOperator kafkaInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaInput"),
+        KafkaSinglePortInputOperator.class);
+    kafkaInput.setTopics((String)operands.get(KAFKA_TOPICS));
+    kafkaInput.setInitialOffset("EARLIEST");
+
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS));
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+    kafkaInput.setConsumerProps(props);
+
+    kafkaInput.setClusters((String)operands.get(KAFKA_SERVERS));
+
+    RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory);
+    dag.addStream(OperatorUtils.getUniqueStreamName("Kafka", "Parser"), kafkaInput.outputPort,
+        spec.getInputPorts().get(0));
+    return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(),
+        messageFormat.getRowType(typeFactory));
+  }
+
+  @Override
+  public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory);
+
+    KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaOutput"),
+        KafkaSinglePortOutputOperator.class);
+    kafkaOutput.setTopic((String)operands.get(KAFKA_TOPICS));
+
+    Properties props = new Properties();
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS));
+    kafkaOutput.setProperties(props);
+
+    dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "Kafka"), spec.getOutPort(), kafkaOutput.inputPort);
+
+    return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory));
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory)
+  {
+    return messageFormat.getRowType(typeFactory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java
new file mode 100644
index 0000000..80fef93
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.planner.RelInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * This interface defines how message should be parsed from input or formatted for output.
+ * The implementation of this interface should define both parsing and formatting representation for data.
+ */
+@InterfaceStability.Evolving
+public interface MessageFormat
+{
+  String MESSAGE_FORMAT = "messageFormat";
+  String MESSAGE_FORMAT_OPERANDS = "messageFormatOperands";
+
+  /**
+   * Gives type of {@link MessageFormat}
+   * @return Returns type of {@link MessageFormat}
+   */
+  MessageFormatType getMessageFormatType();
+
+  /**
+   * Set messageFormat operands. This method is used when the table definitions are provided using calcite schema format.
+   * This is the map which is present against key "endpointOperands" in calcite schema definition input file.
+   * @param operands
+   */
+  void setMessageFormatOperands(Map<String, Object> operands);
+
+  /**
+   * Implementation of this method should populate the DAG for parsing logic for the data received from {@link Endpoint}
+   *
+   * @param dag {@link DAG} object to be populated
+   * @param typeFactory Java Type Factory
+   * @return Returns {@link RelInfo} defining output data type definition after parsing of data.
+   */
+  RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+  /**
+   * Implementation of this method should populate the DAG for formatting logic of data to be written to {@link Endpoint}
+   *
+   * @param dag {@link DAG} object to be populated
+   * @param typeFactory Java Type Factory
+   * @return Returns {@link RelInfo} defining expected input for formatting ot data.
+   */
+  RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory);
+
+  /**
+   * This method returns what should be the input data type to output phase OR output data type of input phase.
+   *
+   * @param typeFactory Java Type Factory for data type conversions.
+   *
+   * @return {@link RelDataType} representing data type format.
+   */
+  RelDataType getRowType(RelDataTypeFactory typeFactory);
+
+  /**
+   * Message Format types
+   */
+  enum MessageFormatType
+  {
+    CSV
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java
----------------------------------------------------------------------
diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java
new file mode 100644
index 0000000..5462e42
--- /dev/null
+++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.table;
+
+import java.lang.reflect.Field;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.apex.malhar.sql.planner.RelInfo;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is an implementation of {@link Endpoint} which defined how data should be read/written to a Apex streaming port.
+ */
+@InterfaceStability.Evolving
+public class StreamEndpoint implements Endpoint
+{
+  private Operator.InputPort inputPort;
+  private Operator.OutputPort outputPort;
+  private Class pojoClass;
+  private Map<String, Class> fieldMapping;
+
+  public StreamEndpoint(Operator.InputPort port, Class pojoClass)
+  {
+    this.inputPort = port;
+    this.pojoClass = pojoClass;
+  }
+
+  public StreamEndpoint(Operator.InputPort port, Map<String, Class> fieldMapping)
+  {
+    this.inputPort = port;
+    this.fieldMapping = fieldMapping;
+  }
+
+  public StreamEndpoint(Operator.OutputPort outputPort, Class pojoClass)
+  {
+    this.outputPort = outputPort;
+    this.pojoClass = pojoClass;
+  }
+
+  public StreamEndpoint(Operator.OutputPort port, Map<String, Class> fieldMapping)
+  {
+    this.outputPort = port;
+    this.fieldMapping = fieldMapping;
+  }
+
+  @Override
+  public EndpointType getTargetType()
+  {
+    return EndpointType.PORT;
+  }
+
+  @Override
+  public void setEndpointOperands(Map<String, Object> operands)
+  {
+  }
+
+  @Override
+  public void setMessageFormat(MessageFormat messageFormat)
+  {
+  }
+
+  @Override
+  public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    return new RelInfo("StreamInput", Lists.<Operator.InputPort>newArrayList(), null, outputPort, getRowType(typeFactory));
+  }
+
+  @Override
+  public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory)
+  {
+    return new RelInfo("StreamOutput", Lists.newArrayList(inputPort), null, null, getRowType(typeFactory));
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory)
+  {
+    RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
+    if (fieldMapping != null) {
+      for (Map.Entry<String, Class> entry : fieldMapping.entrySet()) {
+        builder.add(entry.getKey(), convertField(typeFactory, entry.getValue()));
+      }
+    } else if (pojoClass != null) {
+      for (Field field : pojoClass.getDeclaredFields()) {
+        builder.add(field.getName(), convertField(typeFactory, field.getType()));
+      }
+    } else {
+      throw new RuntimeException("Either fieldMapping or pojoClass needs to be set.");
+    }
+
+    return builder.build();
+  }
+
+  private RelDataType convertField(RelDataTypeFactory typeFactory, Class<?> type)
+  {
+    RelDataType relDataType;
+
+    if ((type == Boolean.class) || (type == boolean.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+    } else if ((type == Double.class) || (type == double.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+    } else if ((type == Integer.class) || (type == int.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+    } else if ((type == Float.class) || (type == float.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+    } else if ((type == Long.class) || (type == long.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+    } else if ((type == Short.class) || (type == short.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.SMALLINT);
+    } else if ((type == Character.class) || (type == char.class) || (type == Byte.class) || (type == byte.class)) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.CHAR);
+    } else if (type == String.class) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+    } else if (type == Date.class) {
+      relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+    } else {
+      relDataType = typeFactory.createSqlType(SqlTypeName.ANY);
+    }
+    return relDataType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java
new file mode 100644
index 0000000..900fd10
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.TimeZone;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class FileEndpointTest
+{
+  private TimeZone defaultTZ;
+  private static String outputFolder = "target/output/";
+
+  @Rule
+  public TestName testName = new TestName();
+
+  public static String apex_concat_str(String s1, String s2)
+  {
+    return s1 + s2;
+  }
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    outputFolder += testName.getMethodName() + "/";
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    File modelFile = new File("src/test/resources/model/model_file_csv.json");
+    String model = FileUtils.readFileToString(modelFile);
+
+    PrintStream originalSysout = System.out;
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(baos));
+
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new Application(model), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      waitTillStdoutIsPopulated(baos, 30000);
+
+      lc.shutdown();
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    } catch (Exception e) {
+      Assert.fail("Exception: " + e);
+    }
+
+    System.setOut(originalSysout);
+
+    String[] sout = baos.toString().split(System.lineSeparator());
+    Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+    String[] actualLines = filter.toArray(new String[filter.size()]);
+    Assert.assertEquals(6, actualLines.length);
+    Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+    Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+    Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+    Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+    Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+    Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+  }
+
+  private boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
+    IOException
+  {
+    long now = System.currentTimeMillis();
+    Collection<String> filter = Lists.newArrayList();
+    while (System.currentTimeMillis() - now < timeout) {
+      baos.flush();
+      String[] sout = baos.toString().split(System.lineSeparator());
+      filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+      if (filter.size() != 0) {
+        break;
+      }
+
+      Thread.sleep(500);
+    }
+
+    return (filter.size() != 0);
+  }
+
+  @Test
+  public void testApplicationSelectInsertWithAPI() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new ApplicationSelectInsertWithAPI(), conf);
+
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+      /**
+       * Wait time is 40 sec to ensure that checkpoint happens. AbstractFileOutputOperators flushes the stream
+       * in beforeCheckpoint call.
+       */
+      Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
+      lc.shutdown();
+    } catch (Exception e) {
+      Assert.fail("constraint violations: " + e);
+    }
+
+    File file = new File(outputFolder);
+    File file1 = new File(outputFolder + file.list()[0]);
+    List<String> strings = FileUtils.readLines(file1);
+
+    String[] actualLines = strings.toArray(new String[strings.size()]);
+
+    String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4", "",
+      "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5", ""};
+    Assert.assertTrue(Arrays.deepEquals(actualLines, expectedLines));
+  }
+
+  private boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException
+  {
+    boolean result;
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath());
+    try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) {
+      List<String> strings = Lists.newArrayList();
+      while (System.currentTimeMillis() - now < timeout) {
+        if (fs.exists(outDir)) {
+          File file = new File(outputFolder);
+          if (file.list().length > 0) {
+            File file1 = new File(outputFolder + file.list()[0]);
+            strings = FileUtils.readLines(file1);
+            if (strings.size() != 0) {
+              break;
+            }
+          }
+        }
+
+        Thread.sleep(500);
+      }
+
+      result = fs.exists(outDir) && (strings.size() != 0);
+    }
+
+    return result;
+  }
+
+
+  public static class Application implements StreamingApplication
+  {
+    String model;
+
+    public Application(String model)
+    {
+      this.model = model;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      SQLExecEnvironment.getEnvironment()
+          .withModel(model)
+          .executeSQL(dag, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS");
+    }
+  }
+
+  public static class ApplicationSelectInsertWithAPI implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"id\",\"type\":\"Integer\"}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}," +
+          "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+      String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+      SQLExecEnvironment.getEnvironment()
+          .registerTable("ORDERS", new FileEndpoint("src/test/resources/input.csv",
+          new CSVMessageFormat(schemaIn)))
+          .registerTable("SALES", new FileEndpoint(outputFolder, "out.tmp", new CSVMessageFormat(schemaOut)))
+          .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+          .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " +
+          "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " +
+          "PRODUCT LIKE 'paint%'");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java b/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java
new file mode 100644
index 0000000..7162e31
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.util.Date;
+
+public class InputPOJO
+{
+  private Date RowTime;
+  private int id;
+  private String Product;
+  private int units;
+
+  public Date getRowTime()
+  {
+    return RowTime;
+  }
+
+  public void setRowTime(Date rowTime)
+  {
+    RowTime = rowTime;
+  }
+
+  public int getId()
+  {
+    return id;
+  }
+
+  public void setId(int id)
+  {
+    this.id = id;
+  }
+
+  public String getProduct()
+  {
+    return Product;
+  }
+
+  public void setProduct(String product)
+  {
+    Product = product;
+  }
+
+  public int getUnits()
+  {
+    return units;
+  }
+
+  public void setUnits(int units)
+  {
+    this.units = units;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java
new file mode 100644
index 0000000..14eff70
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+
+public class KafkaEndpointTest
+{
+  private final String testTopicData0 = "dataTopic0";
+  private final String testTopicData1 = "dataTopic1";
+  private final String testTopicResult = "resultTopic";
+
+  private EmbeddedKafka kafka;
+
+  private TimeZone defaultTZ;
+
+  @Before
+  public void setup() throws IOException
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    kafka = new EmbeddedKafka();
+    kafka.start();
+    kafka.createTopic(testTopicData0);
+    kafka.createTopic(testTopicData1);
+    kafka.createTopic(testTopicResult);
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    kafka.stop();
+
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void testApplicationSelectInsertWithAPI() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new KafkaApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+      String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n",
+          "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"};
+
+      List<String> consume = kafka.consume(testTopicResult, 30000);
+      Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+      lc.shutdown();
+    } catch (Exception e) {
+      Assert.fail("constraint violations: " + e);
+    }
+  }
+
+  @Test
+  public void testApplicationWithPortEndpoint() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new KafkaPortApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+      String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n",
+          "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"};
+
+      List<String> consume = kafka.consume(testTopicResult, 30000);
+
+      Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+      lc.shutdown();
+    } catch (Exception e) {
+      Assert.fail("constraint violations: " + e);
+    }
+  }
+
+  @Test
+  public void testApplicationJoin() throws Exception
+  {
+    String sql = "INSERT INTO SALES " +
+        "SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " +
+        "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " +
+        "FROM ORDERS AS A " +
+        "JOIN CATEGORY AS B ON A.id = B.id " +
+        "WHERE A.id > 3 AND A.PRODUCT LIKE 'paint%'";
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new KafkaJoinApplication(kafka.getBroker(), testTopicData0, testTopicData1,
+          testTopicResult, sql), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      kafka.publish(testTopicData1, Arrays.asList("1,ABC",
+          "2,DEF",
+          "3,GHI", "4,JKL",
+          "5,MNO", "6,PQR"));
+
+      // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+      String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4,JKL\r\n",
+          "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5,MNO\r\n"};
+
+      List<String> consume = kafka.consume(testTopicResult, 30000);
+
+      Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+      lc.shutdown();
+    } catch (Exception e) {
+      Assert.fail("constraint violations: " + e);
+    }
+  }
+
+  @Test
+  public void testApplicationJoinFilter() throws Exception
+  {
+    String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " +
+        "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " +
+        "FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id AND A.id > 3" +
+        "WHERE A.PRODUCT LIKE 'paint%'";
+
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new KafkaJoinApplication(kafka.getBroker(), testTopicData0, testTopicData1,
+          testTopicResult, sql), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      kafka.publish(testTopicData1, Arrays.asList("1,ABC",
+          "2,DEF",
+          "3,GHI", "4,JKL",
+          "5,MNO", "6,PQR"));
+
+      // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char.
+      String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4,JKL\r\n",
+          "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5,MNO\r\n"};
+
+      List<String> consume = kafka.consume(testTopicResult, 30000);
+
+      Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+      lc.shutdown();
+    } catch (Exception e) {
+      Assert.fail("constraint violations: " + e);
+    }
+  }
+
+  public static class KafkaApplication implements StreamingApplication
+  {
+    private String broker;
+    private String sourceTopic;
+    private String destTopic;
+
+    public KafkaApplication(String broker, String sourceTopic, String destTopic)
+    {
+      this.broker = broker;
+      this.sourceTopic = sourceTopic;
+      this.destTopic = destTopic;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"id\",\"type\":\"Integer\"}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}," +
+          "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+      String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+      SQLExecEnvironment.getEnvironment()
+          .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic, new CSVMessageFormat(schemaIn)))
+          .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut)))
+          .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+          .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " +
+          "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " +
+          "PRODUCT LIKE 'paint%'");
+    }
+  }
+
+  public static class KafkaJoinApplication implements StreamingApplication
+  {
+    private String broker;
+    private String sourceTopic0;
+    private String sourceTopic1;
+    private String destTopic;
+    private String sql;
+
+    public KafkaJoinApplication(String broker, String sourceTopic0, String sourceTopic1, String destTopic, String sql)
+    {
+      this.broker = broker;
+      this.sourceTopic0 = sourceTopic0;
+      this.sourceTopic1 = sourceTopic1;
+      this.destTopic = destTopic;
+      this.sql = sql;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"id\",\"type\":\"Integer\"}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}," +
+          "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+      String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"id\",\"type\":\"Integer\"}," +
+          "{\"name\":\"Category\",\"type\":\"String\"}]}";
+      String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}," +
+          "{\"name\":\"Category\",\"type\":\"String\"}]}";
+
+      SQLExecEnvironment.getEnvironment()
+          .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic0, new CSVMessageFormat(schemaIn0)))
+          .registerTable("CATEGORY", new KafkaEndpoint(broker, sourceTopic1, new CSVMessageFormat(schemaIn1)))
+          .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut)))
+          .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+          .executeSQL(dag, sql);
+    }
+  }
+
+  public static class KafkaPortApplication implements StreamingApplication
+  {
+    private String broker;
+    private String sourceTopic;
+    private String destTopic;
+
+    public KafkaPortApplication(String broker, String sourceTopic, String destTopic)
+    {
+      this.broker = broker;
+      this.sourceTopic = sourceTopic;
+      this.destTopic = destTopic;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"id\",\"type\":\"Integer\"}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}," +
+          "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+      String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+      KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+      kafkaInput.setTopics(sourceTopic);
+      kafkaInput.setInitialOffset("EARLIEST");
+      Properties props = new Properties();
+      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER);
+      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER);
+      kafkaInput.setConsumerProps(props);
+      kafkaInput.setClusters(broker);
+
+      CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+      csvParser.setSchema(schemaIn);
+
+      dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+      CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class);
+      formatter.setSchema(schemaOut);
+
+      KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class);
+      kafkaOutput.setTopic(destTopic);
+
+      props = new Properties();
+      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER);
+      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER);
+      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+      kafkaOutput.setProperties(props);
+
+      dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort);
+
+      SQLExecEnvironment.getEnvironment()
+          .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class))
+          .registerTable("SALES", new StreamEndpoint(formatter.in, OutputPOJO.class))
+          .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+          .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " +
+          "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " +
+          "PRODUCT LIKE 'paint%'");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java b/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java
new file mode 100644
index 0000000..fdf78d7
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+
+import java.util.Date;
+
+public class OutputPOJO
+{
+  private Date RowTime1;
+  private Date RowTime2;
+  private String Product;
+
+  public Date getRowTime1()
+  {
+    return RowTime1;
+  }
+
+  public void setRowTime1(Date rowTime1)
+  {
+    RowTime1 = rowTime1;
+  }
+
+  public Date getRowTime2()
+  {
+    return RowTime2;
+  }
+
+  public void setRowTime2(Date rowTime2)
+  {
+    RowTime2 = rowTime2;
+  }
+
+  public String getProduct()
+  {
+    return Product;
+  }
+
+  public void setProduct(String product)
+  {
+    Product = product;
+  }
+}