You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/15 13:08:44 UTC

apex-malhar git commit: APEXMALHAR-2304 SQL Support Examples

Repository: apex-malhar
Updated Branches:
  refs/heads/master a25b6140c -> 9db044741


APEXMALHAR-2304 SQL Support Examples


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9db04474
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9db04474
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9db04474

Branch: refs/heads/master
Commit: 9db044741a32d466c2df4d1582ca707fcce47402
Parents: a25b614
Author: Chinmay Kolhatkar <ch...@datatorrent.com>
Authored: Mon Oct 24 11:41:04 2016 +0530
Committer: Chinmay Kolhatkar <ch...@datatorrent.com>
Committed: Tue Nov 15 16:15:30 2016 +0530

----------------------------------------------------------------------
 demos/pom.xml                                   |   1 +
 demos/sql/pom.xml                               | 102 ++++++++++++
 demos/sql/src/assemble/appPackage.xml           |  59 +++++++
 .../sql/sample/FusionStyleSQLApplication.java   |  88 +++++++++++
 .../sql/sample/PureStyleSQLApplication.java     |  65 ++++++++
 .../sql/sample/SQLApplicationWithAPI.java       |  45 ++++++
 .../sql/sample/SQLApplicationWithModelFile.java |  50 ++++++
 .../properties-FusionStyleSQLApplication.xml    |  65 ++++++++
 .../properties-PureStyleSQLApplication.xml      |  65 ++++++++
 .../properties-SQLApplicationWithAPI.xml        |  43 +++++
 .../properties-SQLApplicationWithModelFile.xml  |  32 ++++
 .../src/main/resources/META-INF/properties.xml  |  41 +++++
 .../main/resources/model/model_file_csv.json    |  27 ++++
 .../sample/FusionStyleSQLApplicationTest.java   | 121 +++++++++++++++
 .../sql/sample/PureStyleSQLApplicationTest.java | 155 +++++++++++++++++++
 .../sql/sample/SQLApplicationWithAPITest.java   |  92 +++++++++++
 .../sample/SQLApplicationWithModelFileTest.java | 113 ++++++++++++++
 demos/sql/src/test/resources/input.csv          |   6 +
 demos/sql/src/test/resources/log4j.properties   |  50 ++++++
 sql/pom.xml                                     |   1 -
 20 files changed, 1220 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 185bb54..c4fda7a 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -168,6 +168,7 @@
       <modules>
         <module>distributedistinct</module>
         <module>highlevelapi</module>
+        <module>sql</module>
       </modules>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/pom.xml
----------------------------------------------------------------------
diff --git a/demos/sql/pom.xml b/demos/sql/pom.xml
new file mode 100644
index 0000000..fd00f58
--- /dev/null
+++ b/demos/sql/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>sql-demo</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar SQL API Demo</name>
+  <description>Apex demo applications that use SQL APIs to construct a DAG</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-demos</artifactId>
+    <version>3.6.0-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9.1</version>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>target/${project.artifactId}-${project.version}.apa</file>
+                  <type>apa</type>
+                </artifact>
+              </artifacts>
+              <skipAttach>false</skipAttach>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.core.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-sql</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- For KafkaTest -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/assemble/appPackage.xml b/demos/sql/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/demos/sql/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
new file mode 100644
index 0000000..94b02db
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.parser.CsvParser;
+
+
+@ApplicationAnnotation(name = "FusionStyleSQLApplication")
+public class FusionStyleSQLApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    SQLExecEnvironment env = SQLExecEnvironment.getEnvironment();
+    env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str");
+
+    Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of(
+        "RowTime", Date.class,
+        "id", Integer.class,
+        "Product", String.class,
+        "units", Integer.class);
+
+    // Add Kafka Input
+    KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+    kafkaInput.setInitialOffset("EARLIEST");
+
+    // Add CSVParser
+    CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+    dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+    // Register CSV Parser output as input table for first SQL
+    env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping));
+
+    // Register FileEndpoint as output table for second SQL.
+    env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"),
+        conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef"))));
+
+    // Add second SQL to DAG
+    env.executeSQL(dag, conf.get("sql"));
+  }
+
+  public static class PassThroughOperator extends BaseOperator
+  {
+    public final transient DefaultOutputPort output = new DefaultOutputPort();
+    public final transient DefaultInputPort input = new DefaultInputPort()
+    {
+      @Override
+      public void process(Object o)
+      {
+        output.emit(output);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
new file mode 100644
index 0000000..9a727a3
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "PureStyleSQLApplication")
+public class PureStyleSQLApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Source definition
+    String schemaInName = conf.get("schemaInName");
+    String schemaInDef = conf.get("schemaInDef");
+    String broker = conf.get("broker");
+    String sourceTopic = conf.get("topic");
+
+    // Destination definition
+    String schemaOutName = conf.get("schemaOutName");
+    String schemaOutDef = conf.get("schemaOutDef");
+    String outputFolder = conf.get("outputFolder");
+    String outFilename = conf.get("destFileName");
+
+    // SQL statement
+    String sql = conf.get("sql");
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic,
+            new CSVMessageFormat(schemaInDef)))
+        .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename,
+            new CSVMessageFormat(schemaOutDef)))
+        .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str")
+        .executeSQL(dag, sql);
+  }
+
+  public static String apex_concat_str(String s1, String s2)
+  {
+    return s1 + s2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
new file mode 100644
index 0000000..604332b
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "SQLApplicationWithAPI")
+public class SQLApplicationWithAPI implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Source definition
+    String schemaInName = conf.get("csvSchemaInName");
+    String schemaIn = conf.get("csvSchemaIn");
+    String sourceFile = conf.get("sourceFile");
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn)))
+        .executeSQL(dag, conf.get("sql"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
new file mode 100644
index 0000000..2d22b18
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "SQLApplicationWithModelFile")
+public class SQLApplicationWithModelFile implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    String modelFile = conf.get("modelFile");
+    String model;
+    try {
+      model = FileUtils.readFileToString(new File(modelFile));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    SQLExecEnvironment.getEnvironment()
+        .withModel(model)
+        .executeSQL(dag, conf.get("sql"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
new file mode 100644
index 0000000..77852e7
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Kafka Operator Properties -->
+  <property>
+    <name>dt.operator.KafkaInput.prop.topics</name>
+    <value>dataTopic</value>
+  </property>
+  <property>
+    <name>dt.operator.KafkaInput.prop.clusters</name>
+    <value>localhost:9092</value>  <!-- broker (NOT zookeeper) address -->
+  </property>
+
+  <!-- CSV Parser Properties -->
+  <property>
+    <name>dt.operator.CSVParser.prop.schema</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+  </property>
+
+  <!-- SQL Properties -->
+  <property>
+    <name>sqlSchemaInputName</name>
+    <value>FROMCSV</value>
+  </property>
+  <property>
+    <name>sqlSchemaOutputName</name>
+    <value>TOFILE</value>
+  </property>
+  <property>
+    <name>folderPath</name>
+    <value>/tmp/output</value>
+  </property>
+  <property>
+    <name>fileName</name>
+    <value>output.txt</value>
+  </property>
+  <property>
+    <name>sqlSchemaOutputDef</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
+  </property>
+  <property>
+    <name>sql</name>
+    <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
new file mode 100644
index 0000000..0d25aa6
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Input Definition -->
+  <property>
+    <name>schemaInName</name>
+    <value>ORDERS</value>
+  </property>
+  <property>
+    <name>schemaInDef</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+  </property>
+  <property>
+    <name>broker</name>
+    <value>localhost:9090</value>
+  </property>
+  <property>
+    <name>topic</name>
+    <value>inputTopic</value>
+  </property>
+
+  <!-- Output Definition -->
+  <property>
+    <name>schemaOutName</name>
+    <value>SALES</value>
+  </property>
+  <property>
+    <name>schemaOutDef</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
+  </property>
+  <property>
+    <name>outputFolder</name>
+    <value>/tmp/output</value>
+  </property>
+  <property>
+    <name>destFileName</name>
+    <value>out.file</value>
+  </property>
+
+  <!-- Execution SQL -->
+  <property>
+    <name>sql</name>
+    <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
new file mode 100644
index 0000000..9ac49d4
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Input Definition -->
+  <property>
+    <name>csvSchemaInName</name>
+    <value>ORDERS</value>
+  </property>
+  <property>
+    <name>csvSchemaIn</name>
+    <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+  </property>
+  <property>
+    <name>sourceFile</name>
+    <value>src/test/resources/input.csv</value>
+  </property>
+
+  <!-- Execution SQL -->
+  <property>
+    <name>sql</name>
+    <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
new file mode 100644
index 0000000..ab026c2
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>modelFile</name>
+    <value>src/main/resources/model/model_file_csv.json</value>
+  </property>
+  <property>
+    <name>sql</name>
+    <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties.xml b/demos/sql/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..6080bf6
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Memory settings for all demos -->
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>512</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+    <value>256</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+    <value>-Xmx128M</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+    <value>128</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/model/model_file_csv.json
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/model/model_file_csv.json b/demos/sql/src/main/resources/model/model_file_csv.json
new file mode 100644
index 0000000..beba18d
--- /dev/null
+++ b/demos/sql/src/main/resources/model/model_file_csv.json
@@ -0,0 +1,27 @@
+{
+  "version": "1.0",
+  "defaultSchema": "APEX",
+  "schemas": [{
+    "name": "APEX",
+    "tables": [
+      {
+        "name": "ORDERS",
+        "type": "custom",
+        "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory",
+        "stream": {
+        "stream": true
+        },
+        "operand": {
+          "endpoint": "file",
+          "messageFormat": "csv",
+          "endpointOperands": {
+            "directory": "src/test/resources/input.csv"
+          },
+          "messageFormatOperands": {
+            "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"
+          }
+        }
+      }
+    ]
+  }]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
new file mode 100644
index 0000000..7208701
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+public class FusionStyleSQLApplicationTest
+{
+  private final String testTopicData = "dataTopic";
+  private final String testTopicResult = "resultTopic";
+
+  private TimeZone defaultTZ;
+  private EmbeddedKafka kafka;
+
+  private static String outputFolder = "target/output/";
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    kafka = new EmbeddedKafka();
+    kafka.start();
+    kafka.createTopic(testTopicData);
+    kafka.createTopic(testTopicResult);
+
+    outputFolder += testName.getMethodName() + "/";
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    kafka.stop();
+
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml"));
+
+      conf.set("dt.operator.KafkaInput.prop.topics", testTopicData);
+      conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker());
+      conf.set("folderPath", outputFolder);
+      conf.set("fileName", "out.tmp");
+
+      FusionStyleSQLApplication app = new FusionStyleSQLApplication();
+
+      lma.prepareDAG(app, conf);
+
+      LocalMode.Controller lc = lma.getController();
+
+      lc.runAsync();
+      kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000));
+      lc.shutdown();
+
+      File file = new File(outputFolder);
+      File file1 = new File(outputFolder + file.list()[0]);
+      List<String> strings = FileUtils.readLines(file1);
+
+      String[] actualLines = strings.toArray(new String[strings.size()]);
+      String[] expectedLines = new String[] {
+          "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
+          "",
+          "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
+          ""};
+      Assert.assertEquals(expectedLines.length, actualLines.length);
+      for (int i = 0; i < actualLines.length; i++) {
+        Assert.assertEquals(expectedLines[i], actualLines[i]);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
new file mode 100644
index 0000000..f298059
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+
+public class PureStyleSQLApplicationTest
+{
+  private final String testTopicData = "dataTopic";
+  private final String testTopicResult = "resultTopic";
+
+  private TimeZone defaultTZ;
+  private EmbeddedKafka kafka;
+  private static String outputFolder = "target/output/";
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    kafka = new EmbeddedKafka();
+    kafka.start();
+    kafka.createTopic(testTopicData);
+    kafka.createTopic(testTopicResult);
+
+    outputFolder += testName.getMethodName() + "/";
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    kafka.stop();
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml"));
+
+    conf.set("broker", kafka.getBroker());
+    conf.set("topic", testTopicData);
+    conf.set("outputFolder", outputFolder);
+    conf.set("destFileName", "out.tmp");
+
+    PureStyleSQLApplication app = new PureStyleSQLApplication();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+
+    lc.runAsync();
+    kafka.publish(testTopicData, Arrays.asList(
+        "15/02/2016 10:15:00 +0000,1,paint1,11",
+        "15/02/2016 10:16:00 +0000,2,paint2,12",
+        "15/02/2016 10:17:00 +0000,3,paint3,13",
+        "15/02/2016 10:18:00 +0000,4,paint4,14",
+        "15/02/2016 10:19:00 +0000,5,paint5,15",
+        "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+    Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
+    lc.shutdown();
+
+    File file = new File(outputFolder);
+    File file1 = new File(outputFolder + file.list()[0]);
+    List<String> strings = FileUtils.readLines(file1);
+
+    String[] actualLines = strings.toArray(new String[strings.size()]);
+
+    String[] expectedLines = new String[]{
+        "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
+        "",
+        "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
+        ""};
+
+    Assert.assertEquals(expectedLines.length, actualLines.length);
+    for (int i = 0;i < expectedLines.length; i++) {
+      Assert.assertEquals(expectedLines[i], actualLines[i]);
+    }
+  }
+
+  public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException
+  {
+    boolean result;
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath());
+    try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) {
+      List<String> strings = Lists.newArrayList();
+      while (System.currentTimeMillis() - now < timeout) {
+        if (fs.exists(outDir)) {
+          File file = new File(outputFolder);
+          if (file.list().length > 0) {
+            File file1 = new File(outputFolder + file.list()[0]);
+            strings = FileUtils.readLines(file1);
+            if (strings.size() != 0) {
+              break;
+            }
+          }
+        }
+
+        Thread.sleep(500);
+      }
+
+      result = fs.exists(outDir) && (strings.size() != 0);
+    }
+
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
new file mode 100644
index 0000000..6b1a404
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+
+import com.datatorrent.api.LocalMode;
+
+
+public class SQLApplicationWithAPITest
+{
+  private TimeZone defaultTZ;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml"));
+
+    SQLApplicationWithAPI app = new SQLApplicationWithAPI();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+
+    PrintStream originalSysout = System.out;
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(baos));
+
+    lc.runAsync();
+    SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000);
+    lc.shutdown();
+
+    System.setOut(originalSysout);
+
+    String[] sout = baos.toString().split(System.lineSeparator());
+    Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+    String[] actualLines = filter.toArray(new String[filter.size()]);
+    Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+    Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+    Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+    Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+    Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+    Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
new file mode 100644
index 0000000..7bbb8ec
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+public class SQLApplicationWithModelFileTest
+{
+  private TimeZone defaultTZ;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml"));
+
+    SQLApplicationWithModelFile app = new SQLApplicationWithModelFile();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+
+    PrintStream originalSysout = System.out;
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(baos));
+
+    lc.runAsync();
+    waitTillStdoutIsPopulated(baos, 30000);
+    lc.shutdown();
+
+    System.setOut(originalSysout);
+
+    String[] sout = baos.toString().split(System.lineSeparator());
+    Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+    String[] actualLines = filter.toArray(new String[filter.size()]);
+    Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+    Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+    Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+    Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+    Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+    Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+  }
+
+  public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
+    IOException
+  {
+    long now = System.currentTimeMillis();
+    Collection<String> filter = Lists.newArrayList();
+    while (System.currentTimeMillis() - now < timeout) {
+      baos.flush();
+      String[] sout = baos.toString().split(System.lineSeparator());
+      filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+      if (filter.size() != 0) {
+        break;
+      }
+
+      Thread.sleep(500);
+    }
+
+    return (filter.size() != 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/resources/input.csv
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/resources/input.csv b/demos/sql/src/test/resources/input.csv
new file mode 100644
index 0000000..c4786d1
--- /dev/null
+++ b/demos/sql/src/test/resources/input.csv
@@ -0,0 +1,6 @@
+15/02/2016 10:15:00 +0000,1,paint1,11
+15/02/2016 10:16:00 +0000,2,paint2,12
+15/02/2016 10:17:00 +0000,3,paint3,13
+15/02/2016 10:18:00 +0000,4,paint4,14
+15/02/2016 10:19:00 +0000,5,paint5,15
+15/02/2016 10:10:00 +0000,6,abcde6,16

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/resources/log4j.properties b/demos/sql/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8ea3cfe
--- /dev/null
+++ b/demos/sql/src/test/resources/log4j.properties
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=WARN
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.calcite=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.kafka.consumer=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sql/pom.xml b/sql/pom.xml
index 24ef44c..c5d96c5 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -161,7 +161,6 @@
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.9.0.0</version>
-      <optional>true</optional>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>