You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/15 13:08:44 UTC
apex-malhar git commit: APEXMALHAR-2304 SQL Support Examples
Repository: apex-malhar
Updated Branches:
refs/heads/master a25b6140c -> 9db044741
APEXMALHAR-2304 SQL Support Examples
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9db04474
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9db04474
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9db04474
Branch: refs/heads/master
Commit: 9db044741a32d466c2df4d1582ca707fcce47402
Parents: a25b614
Author: Chinmay Kolhatkar <ch...@datatorrent.com>
Authored: Mon Oct 24 11:41:04 2016 +0530
Committer: Chinmay Kolhatkar <ch...@datatorrent.com>
Committed: Tue Nov 15 16:15:30 2016 +0530
----------------------------------------------------------------------
demos/pom.xml | 1 +
demos/sql/pom.xml | 102 ++++++++++++
demos/sql/src/assemble/appPackage.xml | 59 +++++++
.../sql/sample/FusionStyleSQLApplication.java | 88 +++++++++++
.../sql/sample/PureStyleSQLApplication.java | 65 ++++++++
.../sql/sample/SQLApplicationWithAPI.java | 45 ++++++
.../sql/sample/SQLApplicationWithModelFile.java | 50 ++++++
.../properties-FusionStyleSQLApplication.xml | 65 ++++++++
.../properties-PureStyleSQLApplication.xml | 65 ++++++++
.../properties-SQLApplicationWithAPI.xml | 43 +++++
.../properties-SQLApplicationWithModelFile.xml | 32 ++++
.../src/main/resources/META-INF/properties.xml | 41 +++++
.../main/resources/model/model_file_csv.json | 27 ++++
.../sample/FusionStyleSQLApplicationTest.java | 121 +++++++++++++++
.../sql/sample/PureStyleSQLApplicationTest.java | 155 +++++++++++++++++++
.../sql/sample/SQLApplicationWithAPITest.java | 92 +++++++++++
.../sample/SQLApplicationWithModelFileTest.java | 113 ++++++++++++++
demos/sql/src/test/resources/input.csv | 6 +
demos/sql/src/test/resources/log4j.properties | 50 ++++++
sql/pom.xml | 1 -
20 files changed, 1220 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 185bb54..c4fda7a 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -168,6 +168,7 @@
<modules>
<module>distributedistinct</module>
<module>highlevelapi</module>
+ <module>sql</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/pom.xml
----------------------------------------------------------------------
diff --git a/demos/sql/pom.xml b/demos/sql/pom.xml
new file mode 100644
index 0000000..fd00f58
--- /dev/null
+++ b/demos/sql/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>sql-demo</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar SQL API Demo</name>
+ <description>Apex demo applications that use SQL APIs to construct a DAG</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-demos</artifactId>
+ <version>3.6.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${project.artifactId}-${project.version}.apa</file>
+ <type>apa</type>
+ </artifact>
+ </artifacts>
+ <skipAttach>false</skipAttach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.core.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-sql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- For KafkaTest -->
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-kafka</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.9.0.0</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/assemble/appPackage.xml b/demos/sql/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/demos/sql/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
new file mode 100644
index 0000000..94b02db
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.parser.CsvParser;
+
+
+@ApplicationAnnotation(name = "FusionStyleSQLApplication")
+public class FusionStyleSQLApplication implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SQLExecEnvironment env = SQLExecEnvironment.getEnvironment();
+ env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str");
+
+ Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of(
+ "RowTime", Date.class,
+ "id", Integer.class,
+ "Product", String.class,
+ "units", Integer.class);
+
+ // Add Kafka Input
+ KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+ kafkaInput.setInitialOffset("EARLIEST");
+
+ // Add CSVParser
+ CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+ dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+ // Register CSV Parser output as input table for first SQL
+ env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping));
+
+ // Register FileEndpoint as output table for second SQL.
+ env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"),
+ conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef"))));
+
+ // Add second SQL to DAG
+ env.executeSQL(dag, conf.get("sql"));
+ }
+
+ public static class PassThroughOperator extends BaseOperator
+ {
+ public final transient DefaultOutputPort output = new DefaultOutputPort();
+ public final transient DefaultInputPort input = new DefaultInputPort()
+ {
+ @Override
+ public void process(Object o)
+ {
+ output.emit(output);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
new file mode 100644
index 0000000..9a727a3
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "PureStyleSQLApplication")
+public class PureStyleSQLApplication implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Source definition
+ String schemaInName = conf.get("schemaInName");
+ String schemaInDef = conf.get("schemaInDef");
+ String broker = conf.get("broker");
+ String sourceTopic = conf.get("topic");
+
+ // Destination definition
+ String schemaOutName = conf.get("schemaOutName");
+ String schemaOutDef = conf.get("schemaOutDef");
+ String outputFolder = conf.get("outputFolder");
+ String outFilename = conf.get("destFileName");
+
+ // SQL statement
+ String sql = conf.get("sql");
+
+ SQLExecEnvironment.getEnvironment()
+ .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic,
+ new CSVMessageFormat(schemaInDef)))
+ .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename,
+ new CSVMessageFormat(schemaOutDef)))
+ .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str")
+ .executeSQL(dag, sql);
+ }
+
+ public static String apex_concat_str(String s1, String s2)
+ {
+ return s1 + s2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
new file mode 100644
index 0000000..604332b
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "SQLApplicationWithAPI")
+public class SQLApplicationWithAPI implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Source definition
+ String schemaInName = conf.get("csvSchemaInName");
+ String schemaIn = conf.get("csvSchemaIn");
+ String sourceFile = conf.get("sourceFile");
+
+ SQLExecEnvironment.getEnvironment()
+ .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn)))
+ .executeSQL(dag, conf.get("sql"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
new file mode 100644
index 0000000..2d22b18
--- /dev/null
+++ b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.apex.malhar.sql.SQLExecEnvironment;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "SQLApplicationWithModelFile")
+public class SQLApplicationWithModelFile implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String modelFile = conf.get("modelFile");
+ String model;
+ try {
+ model = FileUtils.readFileToString(new File(modelFile));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ SQLExecEnvironment.getEnvironment()
+ .withModel(model)
+ .executeSQL(dag, conf.get("sql"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
new file mode 100644
index 0000000..77852e7
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Kafka Operator Properties -->
+ <property>
+ <name>dt.operator.KafkaInput.prop.topics</name>
+ <value>dataTopic</value>
+ </property>
+ <property>
+ <name>dt.operator.KafkaInput.prop.clusters</name>
+ <value>localhost:9092</value> <!-- broker (NOT zookeeper) address -->
+ </property>
+
+ <!-- CSV Parser Properties -->
+ <property>
+ <name>dt.operator.CSVParser.prop.schema</name>
+ <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+ </property>
+
+ <!-- SQL Properties -->
+ <property>
+ <name>sqlSchemaInputName</name>
+ <value>FROMCSV</value>
+ </property>
+ <property>
+ <name>sqlSchemaOutputName</name>
+ <value>TOFILE</value>
+ </property>
+ <property>
+ <name>folderPath</name>
+ <value>/tmp/output</value>
+ </property>
+ <property>
+ <name>fileName</name>
+ <value>output.txt</value>
+ </property>
+ <property>
+ <name>sqlSchemaOutputDef</name>
+ <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
+ </property>
+ <property>
+ <name>sql</name>
+ <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
new file mode 100644
index 0000000..0d25aa6
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Input Definition -->
+ <property>
+ <name>schemaInName</name>
+ <value>ORDERS</value>
+ </property>
+ <property>
+ <name>schemaInDef</name>
+ <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+ </property>
+ <property>
+ <name>broker</name>
+ <value>localhost:9090</value>
+ </property>
+ <property>
+ <name>topic</name>
+ <value>inputTopic</value>
+ </property>
+
+ <!-- Output Definition -->
+ <property>
+ <name>schemaOutName</name>
+ <value>SALES</value>
+ </property>
+ <property>
+ <name>schemaOutDef</name>
+ <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
+ </property>
+ <property>
+ <name>outputFolder</name>
+ <value>/tmp/output</value>
+ </property>
+ <property>
+ <name>destFileName</name>
+ <value>out.file</value>
+ </property>
+
+ <!-- Execution SQL -->
+ <property>
+ <name>sql</name>
+ <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
new file mode 100644
index 0000000..9ac49d4
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Input Definition -->
+ <property>
+ <name>csvSchemaInName</name>
+ <value>ORDERS</value>
+ </property>
+ <property>
+ <name>csvSchemaIn</name>
+ <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
+ </property>
+ <property>
+ <name>sourceFile</name>
+ <value>src/test/resources/input.csv</value>
+ </property>
+
+ <!-- Execution SQL -->
+ <property>
+ <name>sql</name>
+ <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
new file mode 100644
index 0000000..ab026c2
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>modelFile</name>
+ <value>src/main/resources/model/model_file_csv.json</value>
+ </property>
+ <property>
+ <name>sql</name>
+ <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties.xml b/demos/sql/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..6080bf6
--- /dev/null
+++ b/demos/sql/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Memory settings for all demos -->
+ <property>
+ <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>512</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+ <value>256</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+ <value>-Xmx128M</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+ <value>128</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/main/resources/model/model_file_csv.json
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/model/model_file_csv.json b/demos/sql/src/main/resources/model/model_file_csv.json
new file mode 100644
index 0000000..beba18d
--- /dev/null
+++ b/demos/sql/src/main/resources/model/model_file_csv.json
@@ -0,0 +1,27 @@
+{
+ "version": "1.0",
+ "defaultSchema": "APEX",
+ "schemas": [{
+ "name": "APEX",
+ "tables": [
+ {
+ "name": "ORDERS",
+ "type": "custom",
+ "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory",
+ "stream": {
+ "stream": true
+ },
+ "operand": {
+ "endpoint": "file",
+ "messageFormat": "csv",
+ "endpointOperands": {
+ "directory": "src/test/resources/input.csv"
+ },
+ "messageFormatOperands": {
+ "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"
+ }
+ }
+ }
+ ]
+ }]
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
new file mode 100644
index 0000000..7208701
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+public class FusionStyleSQLApplicationTest
+{
+ private final String testTopicData = "dataTopic";
+ private final String testTopicResult = "resultTopic";
+
+ private TimeZone defaultTZ;
+ private EmbeddedKafka kafka;
+
+ private static String outputFolder = "target/output/";
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void setUp() throws Exception
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+ kafka = new EmbeddedKafka();
+ kafka.start();
+ kafka.createTopic(testTopicData);
+ kafka.createTopic(testTopicResult);
+
+ outputFolder += testName.getMethodName() + "/";
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ kafka.stop();
+
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml"));
+
+ conf.set("dt.operator.KafkaInput.prop.topics", testTopicData);
+ conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker());
+ conf.set("folderPath", outputFolder);
+ conf.set("fileName", "out.tmp");
+
+ FusionStyleSQLApplication app = new FusionStyleSQLApplication();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+
+ lc.runAsync();
+ kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+ "15/02/2016 10:16:00 +0000,2,paint2,12",
+ "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+ "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+ Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000));
+ lc.shutdown();
+
+ File file = new File(outputFolder);
+ File file1 = new File(outputFolder + file.list()[0]);
+ List<String> strings = FileUtils.readLines(file1);
+
+ String[] actualLines = strings.toArray(new String[strings.size()]);
+ String[] expectedLines = new String[] {
+ "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
+ "",
+ "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
+ ""};
+ Assert.assertEquals(expectedLines.length, actualLines.length);
+ for (int i = 0; i < actualLines.length; i++) {
+ Assert.assertEquals(expectedLines[i], actualLines[i]);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
new file mode 100644
index 0000000..f298059
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+
+public class PureStyleSQLApplicationTest
+{
+ private final String testTopicData = "dataTopic";
+ private final String testTopicResult = "resultTopic";
+
+ private TimeZone defaultTZ;
+ private EmbeddedKafka kafka;
+ private static String outputFolder = "target/output/";
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void setUp() throws Exception
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+ kafka = new EmbeddedKafka();
+ kafka.start();
+ kafka.createTopic(testTopicData);
+ kafka.createTopic(testTopicResult);
+
+ outputFolder += testName.getMethodName() + "/";
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ kafka.stop();
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml"));
+
+ conf.set("broker", kafka.getBroker());
+ conf.set("topic", testTopicData);
+ conf.set("outputFolder", outputFolder);
+ conf.set("destFileName", "out.tmp");
+
+ PureStyleSQLApplication app = new PureStyleSQLApplication();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+
+ lc.runAsync();
+ kafka.publish(testTopicData, Arrays.asList(
+ "15/02/2016 10:15:00 +0000,1,paint1,11",
+ "15/02/2016 10:16:00 +0000,2,paint2,12",
+ "15/02/2016 10:17:00 +0000,3,paint3,13",
+ "15/02/2016 10:18:00 +0000,4,paint4,14",
+ "15/02/2016 10:19:00 +0000,5,paint5,15",
+ "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+ Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
+ lc.shutdown();
+
+ File file = new File(outputFolder);
+ File file1 = new File(outputFolder + file.list()[0]);
+ List<String> strings = FileUtils.readLines(file1);
+
+ String[] actualLines = strings.toArray(new String[strings.size()]);
+
+ String[] expectedLines = new String[]{
+ "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
+ "",
+ "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
+ ""};
+
+ Assert.assertEquals(expectedLines.length, actualLines.length);
+ for (int i = 0;i < expectedLines.length; i++) {
+ Assert.assertEquals(expectedLines[i], actualLines[i]);
+ }
+ }
+
+ public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException
+ {
+ boolean result;
+ long now = System.currentTimeMillis();
+ Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath());
+ try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) {
+ List<String> strings = Lists.newArrayList();
+ while (System.currentTimeMillis() - now < timeout) {
+ if (fs.exists(outDir)) {
+ File file = new File(outputFolder);
+ if (file.list().length > 0) {
+ File file1 = new File(outputFolder + file.list()[0]);
+ strings = FileUtils.readLines(file1);
+ if (strings.size() != 0) {
+ break;
+ }
+ }
+ }
+
+ Thread.sleep(500);
+ }
+
+ result = fs.exists(outDir) && (strings.size() != 0);
+ }
+
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
new file mode 100644
index 0000000..6b1a404
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+
+import com.datatorrent.api.LocalMode;
+
+
+public class SQLApplicationWithAPITest
+{
+ private TimeZone defaultTZ;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml"));
+
+ SQLApplicationWithAPI app = new SQLApplicationWithAPI();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+
+ PrintStream originalSysout = System.out;
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(baos));
+
+ lc.runAsync();
+ SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000);
+ lc.shutdown();
+
+ System.setOut(originalSysout);
+
+ String[] sout = baos.toString().split(System.lineSeparator());
+ Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+ String[] actualLines = filter.toArray(new String[filter.size()]);
+ Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+ Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+ Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+ Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+ Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+ Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
new file mode 100644
index 0000000..7bbb8ec
--- /dev/null
+++ b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+public class SQLApplicationWithModelFileTest
+{
+ private TimeZone defaultTZ;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml"));
+
+ SQLApplicationWithModelFile app = new SQLApplicationWithModelFile();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+
+ PrintStream originalSysout = System.out;
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(baos));
+
+ lc.runAsync();
+ waitTillStdoutIsPopulated(baos, 30000);
+ lc.shutdown();
+
+ System.setOut(originalSysout);
+
+ String[] sout = baos.toString().split(System.lineSeparator());
+ Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+ String[] actualLines = filter.toArray(new String[filter.size()]);
+ Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+ Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+ Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+ Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+ Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+ Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+ }
+
+ public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
+ IOException
+ {
+ long now = System.currentTimeMillis();
+ Collection<String> filter = Lists.newArrayList();
+ while (System.currentTimeMillis() - now < timeout) {
+ baos.flush();
+ String[] sout = baos.toString().split(System.lineSeparator());
+ filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+ if (filter.size() != 0) {
+ break;
+ }
+
+ Thread.sleep(500);
+ }
+
+ return (filter.size() != 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/resources/input.csv
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/resources/input.csv b/demos/sql/src/test/resources/input.csv
new file mode 100644
index 0000000..c4786d1
--- /dev/null
+++ b/demos/sql/src/test/resources/input.csv
@@ -0,0 +1,6 @@
+15/02/2016 10:15:00 +0000,1,paint1,11
+15/02/2016 10:16:00 +0000,2,paint2,12
+15/02/2016 10:17:00 +0000,3,paint3,13
+15/02/2016 10:18:00 +0000,4,paint4,14
+15/02/2016 10:19:00 +0000,5,paint5,15
+15/02/2016 10:10:00 +0000,6,abcde6,16
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/demos/sql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/resources/log4j.properties b/demos/sql/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8ea3cfe
--- /dev/null
+++ b/demos/sql/src/test/resources/log4j.properties
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=WARN
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.calcite=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.kafka.consumer=WARN
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9db04474/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sql/pom.xml b/sql/pom.xml
index 24ef44c..c5d96c5 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -161,7 +161,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
- <optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>