You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:24 UTC
[19/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
deleted file mode 100644
index 99cfcd2..0000000
--- a/demos/pom.xml
+++ /dev/null
@@ -1,231 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar</artifactId>
- <version>3.7.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>malhar-demos</artifactId>
- <packaging>pom</packaging>
- <name>Apache Apex Malhar Demos</name>
-
- <properties>
- <apex.apppackage.groupid>${project.groupId}</apex.apppackage.groupid>
- <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
- <semver.plugin.skip>true</semver.plugin.skip>
- <maven.deploy.skip>true</maven.deploy.skip>
- </properties>
-
- <profiles>
- <profile>
- <id>demo-plugin-activation</id>
- <activation>
- <file>
- <exists>${basedir}/src/main</exists>
- </file>
- </activation>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.9</version>
- <configuration>
- <downloadSources>true</downloadSources>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <encoding>UTF-8</encoding>
- <source>1.7</source>
- <target>1.7</target>
- <debug>true</debug>
- <optimize>false</optimize>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.8</version>
- <executions>
- <execution>
- <id>copy-dependencies</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>target/deps</outputDirectory>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>app-package-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptors>
- <descriptor>src/assemble/appPackage.xml</descriptor>
- </descriptors>
- <archiverConfig>
- <defaultDirectoryMode>0755</defaultDirectoryMode>
- </archiverConfig>
- <archive>
- <manifestEntries>
- <Class-Path>${apex.apppackage.classpath}</Class-Path>
- <DT-Engine-Version>${apex.core.version}</DT-Engine-Version>
- <DT-App-Package-Group-Id>${apex.apppackage.groupid}</DT-App-Package-Group-Id>
- <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
- <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
- <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
- <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
- </manifestEntries>
- </archive>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <phase>package</phase>
- <configuration>
- <target>
- <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
- tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>all-modules</id>
- <modules>
- <module>distributedistinct</module>
- <module>highlevelapi</module>
- <module>sql</module>
- </modules>
- </profile>
- </profiles>
-
- <modules>
- <module>machinedata</module>
- <module>pi</module>
- <module>twitter</module>
- <module>yahoofinance</module>
- <module>frauddetect</module>
- <module>mobile</module>
- <module>wordcount</module>
- <module>mrmonitor</module>
- <module>mroperator</module>
- <module>uniquecount</module>
- <module>r</module>
- <module>echoserver</module>
- <module>iteration</module>
- </modules>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-common</artifactId>
- <version>${apex.core.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.10</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-library</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/pom.xml
----------------------------------------------------------------------
diff --git a/demos/r/pom.xml b/demos/r/pom.xml
deleted file mode 100644
index d8b73f8..0000000
--- a/demos/r/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>r-demo</artifactId>
- <packaging>jar</packaging>
-
- <name>Apache Apex Malhar R Demo</name>
- <description>Apex demo applications for using R.</description>
-
- <parent>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-demos</artifactId>
- <version>3.7.0-SNAPSHOT</version>
- </parent>
-
- <properties>
- <skipTests>true</skipTests>
- </properties>
-
- <repositories>
- <repository>
- <id>datatorrent-3rd-party</id>
- <name>Embedded repository for dependencies not available online</name>
- <url>https://www.datatorrent.com/maven/content/repositories/thirdparty</url>
- <snapshots>
- <updatePolicy>daily</updatePolicy>
- </snapshots>
- <releases>
- <updatePolicy>daily</updatePolicy>
- </releases>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>org.rosuda</groupId>
- <artifactId>jri</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
- <groupId>org.rosuda</groupId>
- <artifactId>rengine</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
- <groupId>org.rosuda</groupId>
- <artifactId>jriengine</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-contrib</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/r/src/assemble/appPackage.xml b/demos/r/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/r/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>appPackage</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/</directory>
- <outputDirectory>/app</outputDirectory>
- <includes>
- <include>${project.artifactId}-${project.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/deps</directory>
- <outputDirectory>/lib</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/site/conf</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>*.xml</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/META-INF</directory>
- <outputDirectory>/META-INF</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/app</directory>
- <outputDirectory>/app</outputDirectory>
- </fileSet>
- </fileSets>
-
-</assembly>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java
deleted file mode 100755
index b2bfd46..0000000
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulKey.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.r.oldfaithful;
-
-/**
- * @since 2.1.0
- */
-public class FaithfulKey
-{
-
- private static final long serialVersionUID = 201403251620L;
-
- private double eruptionDuration;
- private int waitingTime;
-
- public FaithfulKey()
- {
- }
-
- public double getEruptionDuration()
- {
- return eruptionDuration;
- }
-
- public void setEruptionDuration(double eruptionDuration)
- {
- this.eruptionDuration = eruptionDuration;
- }
-
- public int getWaitingTime()
- {
- return waitingTime;
- }
-
- public void setWaitingTime(int waitingTime)
- {
- this.waitingTime = waitingTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
deleted file mode 100755
index cf49848..0000000
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.r.oldfaithful;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.contrib.r.RScript;
-
-/**
- * @since 2.1.0
- */
-public class FaithfulRScript extends RScript
-{
-
- private transient List<FaithfulKey> readingsList = new ArrayList<FaithfulKey>();
- private int elapsedTime;
- private static final Logger LOG = LoggerFactory.getLogger(FaithfulRScript.class);
-
- public FaithfulRScript()
- {
- super();
- }
-
- public FaithfulRScript(String rScriptFilePath, String rFunction, String returnVariable)
- {
- super(rScriptFilePath, rFunction, returnVariable);
- }
-
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>()
- {
- @Override
- public void process(FaithfulKey tuple)
- {
- // Create a map of ("String", values) to be passed to the process
- // function in the RScipt operator's process()
- readingsList.add(tuple);
-
- }
-
- };
-
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>()
- {
- @Override
- public void process(Integer eT)
- {
- elapsedTime = eT;
- }
- };
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- }
-
- @Override
- public void endWindow()
- {
- if (readingsList.size() == 0) {
- return;
- }
- LOG.info("Input data size: readingsList - " + readingsList.size());
-
- double[] eruptionDuration = new double[readingsList.size()];
- int[] waitingTime = new int[readingsList.size()];
-
- for (int i = 0; i < readingsList.size(); i++) {
- eruptionDuration[i] = readingsList.get(i).getEruptionDuration();
- waitingTime[i] = readingsList.get(i).getWaitingTime();
- }
- LOG.info("Input data size: eruptionDuration - " + eruptionDuration.length);
- LOG.info("Input data size: waitingTime - " + waitingTime.length);
-
- HashMap<String, Object> map = new HashMap<String, Object>();
-
- map.put("ELAPSEDTIME", elapsedTime);
- map.put("ERUPTIONS", eruptionDuration);
- map.put("WAITING", waitingTime);
-
- super.process(map);
- readingsList.clear();
- map.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
deleted file mode 100755
index c45cd50..0000000
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.r.oldfaithful;
-
-import java.util.Random;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-
-/**
- * The InputGenerator operator is used to generate input for the 'Old Faithful Geyser" application.
- * This application accepts readings for the waiting time and the subsequent eruption duration
- * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next
- * eruption given the elapsed time since the last eruption.
- * The training data is generated for an application window and consists of multiple
- * waiting times and eruption duration values.
- * For every application window, it generates only one 'elapsed time' input for which the
- * prediction would be made.
- *
- * @since 2.1.0
- */
-
-public class InputGenerator implements InputOperator
-{
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(InputGenerator.class);
- private int blastCount = 1000;
- private Random random = new Random();
- private static int emitCount = 0;
-
- public final transient DefaultOutputPort<FaithfulKey> outputPort = new DefaultOutputPort<FaithfulKey>();
-
- public final transient DefaultOutputPort<Integer> elapsedTime = new DefaultOutputPort<Integer>();
-
- public void setBlastCount(int blastCount)
- {
- this.blastCount = blastCount;
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- }
-
- @Override
- public void teardown()
- {
- }
-
- private int nextRandomId(int min, int max)
- {
- int id;
- do {
- id = (int)Math.abs(Math.round(random.nextGaussian() * max));
- }
- while (id >= max);
-
- if (id < min) {
- id = min;
- }
- try {
- // Slowdown input generation
- if (emitCount++ % 97 == 0) {
- Thread.sleep(1);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return id;
- }
-
- @Override
- public void emitTuples()
- {
- boolean elapsedTimeSent = false;
-
- try {
- for (int i = 0; i < blastCount; ++i) {
- int waitingTime = nextRandomId(3600, 36000);
-
- double eruptionDuration = -2.15 + 0.05 * waitingTime;
- emitTuple(eruptionDuration, waitingTime);
-
- if (!elapsedTimeSent) {
- int eT = 0;
-
- if (i % 100 == 0) {
- eT = 54 + waitingTime;
-
- emitElapsedTime(eT);
- elapsedTimeSent = true;
- }
- }
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- private void emitTuple(double eruptionDuration, int waitingTime)
- {
- FaithfulKey faithfulkey = new FaithfulKey();
-
- faithfulkey.setEruptionDuration(eruptionDuration);
- faithfulkey.setWaitingTime(waitingTime);
-
- this.outputPort.emit(faithfulkey);
- }
-
- private void emitElapsedTime(int eT)
- {
- this.elapsedTime.emit(eT);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
deleted file mode 100755
index 0483767..0000000
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.r.oldfaithful;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
-/**
- * The application attempts to simulate 'Old Faithful Geyser" eruption.
- * This application accepts readings for the waiting time and the subsequent eruption duration
- * of the 'Old Faithful' and based on this data, tries to predict the eruption duration of the next
- * eruption given the elapsed time since the last eruption.
- * The training data is generated for an application window and consists of multiple
- * waiting times and eruption duration values.
- * For every application window, it generates only one 'elapsed time' input for which the
- * prediction would be made.
- * Model in R is in file ruptionModel.R located at
- * demos/r/src/main/resources/com/datatorrent/demos/oldfaithful/ directory
- *
- * @since 2.1.0
- */
-
-@ApplicationAnnotation(name = "OldFaithfulApplication")
-public class OldFaithfulApplication implements StreamingApplication
-{
- private final DAG.Locality locality = null;
-
- /**
- * Create the DAG
- */
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
-
- InputGenerator randomInputGenerator = dag.addOperator("rand", new InputGenerator());
- FaithfulRScript rScriptOp = dag.addOperator("rScriptOp", new FaithfulRScript("com/datatorrent/demos/r/oldfaithful/eruptionModel.R", "eruptionModel", "retVal"));
- ConsoleOutputOperator consoles = dag.addOperator("consoles", new ConsoleOutputOperator());
-
- Map<String, FaithfulRScript.REXP_TYPE> argTypeMap = new HashMap<String, FaithfulRScript.REXP_TYPE>();
-
- argTypeMap.put("ELAPSEDTIME", FaithfulRScript.REXP_TYPE.REXP_INT);
- argTypeMap.put("ERUPTIONS", FaithfulRScript.REXP_TYPE.REXP_ARRAY_DOUBLE);
- argTypeMap.put("WAITING", FaithfulRScript.REXP_TYPE.REXP_ARRAY_INT);
-
- rScriptOp.setArgTypeMap(argTypeMap);
-
- dag.addStream("ingen_faithfulRscript", randomInputGenerator.outputPort, rScriptOp.faithfulInput).setLocality(locality);
- dag.addStream("ingen_faithfulRscript_eT", randomInputGenerator.elapsedTime, rScriptOp.inputElapsedTime).setLocality(locality);
- dag.addStream("faithfulRscript_console_s", rScriptOp.strOutput, consoles.input).setLocality(locality);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/r/src/main/resources/META-INF/properties.xml b/demos/r/src/main/resources/META-INF/properties.xml
deleted file mode 100755
index ec8b070..0000000
--- a/demos/r/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
-<!--properties for R demo -->
- <property>
- <name>dt.application.OldFaithfulApplication.class</name>
- <value>com.datatorrent.demos.r.oldfaithful.OldFaithfulApplication</value>
- <description>An alias for OldFaithful application</description>
- </property>
-
- <property>
- <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name>
- <value>1024</value>
- </property>
-
-<!-- Need this to information for loading native libraries -->
- <property>
- <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
- <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value>
- </property>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R
----------------------------------------------------------------------
diff --git a/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R b/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R
deleted file mode 100755
index e46fa8d..0000000
--- a/demos/r/src/main/resources/com/datatorrent/demos/r/oldfaithful/eruptionModel.R
+++ /dev/null
@@ -1,60 +0,0 @@
-#!/usr/bin/Rscript
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-# This script apply the simple linear regression model for the data set 'faithful',
-# and estimates the next eruption duration given the waiting time since the last eruption.
-#
-
- eruptionModel <- function() {
-
- datavar = data.frame(ERUPTIONS, WAITING)
-
- #attach data variable
- attach(datavar)
-
- #create a linear model using lm(FORMULA, DATAVAR)
- #predict the fall eruption duration (ERUPT) using the waiting time since the last eruption (WAITING)
- eruption.lm <- lm(ERUPTIONS ~ WAITING, datavar)
-
- #display linear model
- eruption.lm
-
- # Get the values of the intercept and unemployment so as to be able to predict the enrolment
- interc<-eruption.lm$coeff[["(Intercept)"]]
- eruptionDuration<-eruption.lm$coeff[["WAITING"]]
-
- # Calculate the enrollment based on the percentage being asked for, and the model that has been rated above.
- nextEruptionDuration<-(interc+(eruptionDuration * ELAPSEDTIME))
-
-retVal<-paste("nextEruptionDuration ", nextEruptionDuration, sep=": ")
-#retVal<-c("interc : ",interc, ", eruptionDuration : ", eruptionDuration,", nextEruptionDuration : ", nextEruptionDuration)
-
-sort( sapply(mget(ls()),object.size) )
-
-detach(datavar);
-
-# Clear all the data from R workspace
-rm(datavar);
-rm(ERUPTIONS);
-rm(WAITING);
-
-return(retVal)
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
deleted file mode 100755
index 0bb1901..0000000
--- a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.r.oldfaithful;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-
-public class OldFaithfulApplicationTest
-{
-
- private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class);
-
- @Test
- public void testSomeMethod() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- OldFaithfulApplication app = new OldFaithfulApplication();
- app.populateDAG(lma.getDAG(), new Configuration(false));
-
- try {
- LocalMode.Controller lc = lma.getController();
- lc.setHeartbeatMonitoringEnabled(false);
- lc.run(5000);
- } catch (Exception e) {
- LOG.error("Exception: ", e);
- Assert.fail("Unexpected exception.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/test/resources/dt-site-oldfaithful.xml
----------------------------------------------------------------------
diff --git a/demos/r/src/test/resources/dt-site-oldfaithful.xml b/demos/r/src/test/resources/dt-site-oldfaithful.xml
deleted file mode 100755
index ec8b070..0000000
--- a/demos/r/src/test/resources/dt-site-oldfaithful.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
-<!--properties for R demo -->
- <property>
- <name>dt.application.OldFaithfulApplication.class</name>
- <value>com.datatorrent.demos.r.oldfaithful.OldFaithfulApplication</value>
- <description>An alias for OldFaithful application</description>
- </property>
-
- <property>
- <name>dt.application.OldFaithfulApplication.operator.*.attr.MEMORY_MB</name>
- <value>1024</value>
- </property>
-
-<!-- Need this to information for loading native libraries -->
- <property>
- <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
- <value>-Djava.library.path=/usr/local/lib/R/site-library/rJava/jri/</value>
- </property>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/r/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/r/src/test/resources/log4j.properties b/demos/r/src/test/resources/log4j.properties
deleted file mode 100755
index cf0d19e..0000000
--- a/demos/r/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/pom.xml
----------------------------------------------------------------------
diff --git a/demos/sql/pom.xml b/demos/sql/pom.xml
deleted file mode 100644
index 69ffa73..0000000
--- a/demos/sql/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>sql-demo</artifactId>
- <packaging>jar</packaging>
-
- <name>Apache Apex Malhar SQL API Demo</name>
- <description>Apex demo applications that use SQL APIs to construct a DAG</description>
-
- <parent>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-demos</artifactId>
- <version>3.7.0-SNAPSHOT</version>
- </parent>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9.1</version>
- <executions>
- <execution>
- <id>attach-artifacts</id>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>target/${project.artifactId}-${project.version}.apa</file>
- <type>apa</type>
- </artifact>
- </artifacts>
- <skipAttach>false</skipAttach>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>apex-engine</artifactId>
- <version>${apex.core.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-sql</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- For KafkaTest -->
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-kafka</artifactId>
- <version>${project.parent.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.9.0.0</version>
- <classifier>test</classifier>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/assemble/appPackage.xml b/demos/sql/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/sql/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>appPackage</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/</directory>
- <outputDirectory>/app</outputDirectory>
- <includes>
- <include>${project.artifactId}-${project.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/deps</directory>
- <outputDirectory>/lib</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/site/conf</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>*.xml</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/META-INF</directory>
- <outputDirectory>/META-INF</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/app</directory>
- <outputDirectory>/app</outputDirectory>
- </fileSet>
- </fileSets>
-
-</assembly>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
deleted file mode 100644
index 80b997d..0000000
--- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplication.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
-import org.apache.apex.malhar.sql.SQLExecEnvironment;
-import org.apache.apex.malhar.sql.table.CSVMessageFormat;
-import org.apache.apex.malhar.sql.table.FileEndpoint;
-import org.apache.apex.malhar.sql.table.StreamEndpoint;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.ImmutableMap;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.contrib.parser.CsvParser;
-
-
-@ApplicationAnnotation(name = "FusionStyleSQLApplication")
-/**
- * @since 3.6.0
- */
-public class FusionStyleSQLApplication implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- SQLExecEnvironment env = SQLExecEnvironment.getEnvironment();
- env.registerFunction("APEXCONCAT", PureStyleSQLApplication.class, "apex_concat_str");
-
- Map<String, Class> fieldMapping = ImmutableMap.<String, Class>of(
- "RowTime", Date.class,
- "id", Integer.class,
- "Product", String.class,
- "units", Integer.class);
-
- // Add Kafka Input
- KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
- kafkaInput.setInitialOffset("EARLIEST");
-
- // Add CSVParser
- CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
- dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
-
- // Register CSV Parser output as input table for first SQL
- env.registerTable(conf.get("sqlSchemaInputName"), new StreamEndpoint(csvParser.out, fieldMapping));
-
- // Register FileEndpoint as output table for second SQL.
- env.registerTable(conf.get("sqlSchemaOutputName"), new FileEndpoint(conf.get("folderPath"),
- conf.get("fileName"), new CSVMessageFormat(conf.get("sqlSchemaOutputDef"))));
-
- // Add second SQL to DAG
- env.executeSQL(dag, conf.get("sql"));
- }
-
- public static class PassThroughOperator extends BaseOperator
- {
- public final transient DefaultOutputPort output = new DefaultOutputPort();
- public final transient DefaultInputPort input = new DefaultInputPort()
- {
- @Override
- public void process(Object o)
- {
- output.emit(output);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
deleted file mode 100644
index 79295f9..0000000
--- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplication.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import org.apache.apex.malhar.sql.SQLExecEnvironment;
-import org.apache.apex.malhar.sql.table.CSVMessageFormat;
-import org.apache.apex.malhar.sql.table.FileEndpoint;
-import org.apache.apex.malhar.sql.table.KafkaEndpoint;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-@ApplicationAnnotation(name = "PureStyleSQLApplication")
-/**
- * @since 3.6.0
- */
-public class PureStyleSQLApplication implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- // Source definition
- String schemaInName = conf.get("schemaInName");
- String schemaInDef = conf.get("schemaInDef");
- String broker = conf.get("broker");
- String sourceTopic = conf.get("topic");
-
- // Destination definition
- String schemaOutName = conf.get("schemaOutName");
- String schemaOutDef = conf.get("schemaOutDef");
- String outputFolder = conf.get("outputFolder");
- String outFilename = conf.get("destFileName");
-
- // SQL statement
- String sql = conf.get("sql");
-
- SQLExecEnvironment.getEnvironment()
- .registerTable(schemaInName, new KafkaEndpoint(broker, sourceTopic,
- new CSVMessageFormat(schemaInDef)))
- .registerTable(schemaOutName, new FileEndpoint(outputFolder, outFilename,
- new CSVMessageFormat(schemaOutDef)))
- .registerFunction("APEXCONCAT", this.getClass(), "apex_concat_str")
- .executeSQL(dag, sql);
- }
-
- public static String apex_concat_str(String s1, String s2)
- {
- return s1 + s2;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
deleted file mode 100644
index da4f563..0000000
--- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPI.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import org.apache.apex.malhar.sql.SQLExecEnvironment;
-import org.apache.apex.malhar.sql.table.CSVMessageFormat;
-import org.apache.apex.malhar.sql.table.FileEndpoint;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-@ApplicationAnnotation(name = "SQLApplicationWithAPI")
-/**
- * @since 3.6.0
- */
-public class SQLApplicationWithAPI implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- // Source definition
- String schemaInName = conf.get("csvSchemaInName");
- String schemaIn = conf.get("csvSchemaIn");
- String sourceFile = conf.get("sourceFile");
-
- SQLExecEnvironment.getEnvironment()
- .registerTable(schemaInName, new FileEndpoint(sourceFile, new CSVMessageFormat(schemaIn)))
- .executeSQL(dag, conf.get("sql"));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java b/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
deleted file mode 100644
index 4c90a82..0000000
--- a/demos/sql/src/main/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFile.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.apex.malhar.sql.SQLExecEnvironment;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-@ApplicationAnnotation(name = "SQLApplicationWithModelFile")
-/**
- * @since 3.6.0
- */
-public class SQLApplicationWithModelFile implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- String modelFile = conf.get("modelFile");
- String model;
- try {
- model = FileUtils.readFileToString(new File(modelFile));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- SQLExecEnvironment.getEnvironment()
- .withModel(model)
- .executeSQL(dag, conf.get("sql"));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
deleted file mode 100644
index 77852e7..0000000
--- a/demos/sql/src/main/resources/META-INF/properties-FusionStyleSQLApplication.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <!-- Kafka Operator Properties -->
- <property>
- <name>dt.operator.KafkaInput.prop.topics</name>
- <value>dataTopic</value>
- </property>
- <property>
- <name>dt.operator.KafkaInput.prop.clusters</name>
- <value>localhost:9092</value> <!-- broker (NOT zookeeper) address -->
- </property>
-
- <!-- CSV Parser Properties -->
- <property>
- <name>dt.operator.CSVParser.prop.schema</name>
- <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
- </property>
-
- <!-- SQL Properties -->
- <property>
- <name>sqlSchemaInputName</name>
- <value>FROMCSV</value>
- </property>
- <property>
- <name>sqlSchemaOutputName</name>
- <value>TOFILE</value>
- </property>
- <property>
- <name>folderPath</name>
- <value>/tmp/output</value>
- </property>
- <property>
- <name>fileName</name>
- <value>output.txt</value>
- </property>
- <property>
- <name>sqlSchemaOutputDef</name>
- <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
- </property>
- <property>
- <name>sql</name>
- <value>INSERT INTO TOFILE SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM FROMCSV WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
- </property>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml b/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
deleted file mode 100644
index 0d25aa6..0000000
--- a/demos/sql/src/main/resources/META-INF/properties-PureStyleSQLApplication.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <!-- Input Definition -->
- <property>
- <name>schemaInName</name>
- <value>ORDERS</value>
- </property>
- <property>
- <name>schemaInDef</name>
- <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
- </property>
- <property>
- <name>broker</name>
- <value>localhost:9090</value>
- </property>
- <property>
- <name>topic</name>
- <value>inputTopic</value>
- </property>
-
- <!-- Output Definition -->
- <property>
- <name>schemaOutName</name>
- <value>SALES</value>
- </property>
- <property>
- <name>schemaOutDef</name>
- <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime1","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"RowTime2","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"Product","type":"String"}]}</value>
- </property>
- <property>
- <name>outputFolder</name>
- <value>/tmp/output</value>
- </property>
- <property>
- <name>destFileName</name>
- <value>out.file</value>
- </property>
-
- <!-- Execution SQL -->
- <property>
- <name>sql</name>
- <value>INSERT INTO SALES SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) FROM ORDERS WHERE ID > 3 AND PRODUCT LIKE 'paint%'</value>
- </property>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
deleted file mode 100644
index 9ac49d4..0000000
--- a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithAPI.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <!-- Input Definition -->
- <property>
- <name>csvSchemaInName</name>
- <value>ORDERS</value>
- </property>
- <property>
- <name>csvSchemaIn</name>
- <value>{"separator":",","quoteChar":"\"","fields":[{"name":"RowTime","type":"Date","constraints":{"format":"dd/MM/yyyy hh:mm:ss Z"}},{"name":"id","type":"Integer"},{"name":"Product","type":"String"},{"name":"units","type":"Integer"}]}</value>
- </property>
- <property>
- <name>sourceFile</name>
- <value>src/test/resources/input.csv</value>
- </property>
-
- <!-- Execution SQL -->
- <property>
- <name>sql</name>
- <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
- </property>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml b/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
deleted file mode 100644
index ab026c2..0000000
--- a/demos/sql/src/main/resources/META-INF/properties-SQLApplicationWithModelFile.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>modelFile</name>
- <value>src/main/resources/model/model_file_csv.json</value>
- </property>
- <property>
- <name>sql</name>
- <value>SELECT STREAM ROWTIME, PRODUCT FROM ORDERS</value>
- </property>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/META-INF/properties.xml b/demos/sql/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 6080bf6..0000000
--- a/demos/sql/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <!-- Memory settings for all demos -->
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>512</value>
- </property>
- <property>
- <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
- <value>256</value>
- </property>
- <property>
- <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
- <value>-Xmx128M</value>
- </property>
- <property>
- <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
- <value>128</value>
- </property>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/main/resources/model/model_file_csv.json
----------------------------------------------------------------------
diff --git a/demos/sql/src/main/resources/model/model_file_csv.json b/demos/sql/src/main/resources/model/model_file_csv.json
deleted file mode 100644
index beba18d..0000000
--- a/demos/sql/src/main/resources/model/model_file_csv.json
+++ /dev/null
@@ -1,27 +0,0 @@
-{
- "version": "1.0",
- "defaultSchema": "APEX",
- "schemas": [{
- "name": "APEX",
- "tables": [
- {
- "name": "ORDERS",
- "type": "custom",
- "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory",
- "stream": {
- "stream": true
- },
- "operand": {
- "endpoint": "file",
- "messageFormat": "csv",
- "endpointOperands": {
- "directory": "src/test/resources/input.csv"
- },
- "messageFormatOperands": {
- "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"
- }
- }
- }
- ]
- }]
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
deleted file mode 100644
index 7208701..0000000
--- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/FusionStyleSQLApplicationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import org.apache.apex.malhar.kafka.EmbeddedKafka;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-
-public class FusionStyleSQLApplicationTest
-{
- private final String testTopicData = "dataTopic";
- private final String testTopicResult = "resultTopic";
-
- private TimeZone defaultTZ;
- private EmbeddedKafka kafka;
-
- private static String outputFolder = "target/output/";
-
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void setUp() throws Exception
- {
- defaultTZ = TimeZone.getDefault();
- TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
-
- kafka = new EmbeddedKafka();
- kafka.start();
- kafka.createTopic(testTopicData);
- kafka.createTopic(testTopicResult);
-
- outputFolder += testName.getMethodName() + "/";
- }
-
- @After
- public void tearDown() throws Exception
- {
- kafka.stop();
-
- TimeZone.setDefault(defaultTZ);
- }
-
- @Test
- public void test() throws Exception
- {
- try {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-FusionStyleSQLApplication.xml"));
-
- conf.set("dt.operator.KafkaInput.prop.topics", testTopicData);
- conf.set("dt.operator.KafkaInput.prop.clusters", kafka.getBroker());
- conf.set("folderPath", outputFolder);
- conf.set("fileName", "out.tmp");
-
- FusionStyleSQLApplication app = new FusionStyleSQLApplication();
-
- lma.prepareDAG(app, conf);
-
- LocalMode.Controller lc = lma.getController();
-
- lc.runAsync();
- kafka.publish(testTopicData, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
- "15/02/2016 10:16:00 +0000,2,paint2,12",
- "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
- "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
-
- Assert.assertTrue(PureStyleSQLApplicationTest.waitTillFileIsPopulated(outputFolder, 40000));
- lc.shutdown();
-
- File file = new File(outputFolder);
- File file1 = new File(outputFolder + file.list()[0]);
- List<String> strings = FileUtils.readLines(file1);
-
- String[] actualLines = strings.toArray(new String[strings.size()]);
- String[] expectedLines = new String[] {
- "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
- "",
- "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
- ""};
- Assert.assertEquals(expectedLines.length, actualLines.length);
- for (int i = 0; i < actualLines.length; i++) {
- Assert.assertEquals(expectedLines[i], actualLines[i]);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
deleted file mode 100644
index f298059..0000000
--- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/PureStyleSQLApplicationTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import org.apache.apex.malhar.kafka.EmbeddedKafka;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.LocalMode;
-
-
-public class PureStyleSQLApplicationTest
-{
- private final String testTopicData = "dataTopic";
- private final String testTopicResult = "resultTopic";
-
- private TimeZone defaultTZ;
- private EmbeddedKafka kafka;
- private static String outputFolder = "target/output/";
-
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void setUp() throws Exception
- {
- defaultTZ = TimeZone.getDefault();
- TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
-
- kafka = new EmbeddedKafka();
- kafka.start();
- kafka.createTopic(testTopicData);
- kafka.createTopic(testTopicResult);
-
- outputFolder += testName.getMethodName() + "/";
- }
-
- @After
- public void tearDown() throws Exception
- {
- kafka.stop();
- TimeZone.setDefault(defaultTZ);
- }
-
- @Test
- public void test() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-PureStyleSQLApplication.xml"));
-
- conf.set("broker", kafka.getBroker());
- conf.set("topic", testTopicData);
- conf.set("outputFolder", outputFolder);
- conf.set("destFileName", "out.tmp");
-
- PureStyleSQLApplication app = new PureStyleSQLApplication();
-
- lma.prepareDAG(app, conf);
-
- LocalMode.Controller lc = lma.getController();
-
- lc.runAsync();
- kafka.publish(testTopicData, Arrays.asList(
- "15/02/2016 10:15:00 +0000,1,paint1,11",
- "15/02/2016 10:16:00 +0000,2,paint2,12",
- "15/02/2016 10:17:00 +0000,3,paint3,13",
- "15/02/2016 10:18:00 +0000,4,paint4,14",
- "15/02/2016 10:19:00 +0000,5,paint5,15",
- "15/02/2016 10:10:00 +0000,6,abcde6,16"));
-
- Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000));
- lc.shutdown();
-
- File file = new File(outputFolder);
- File file1 = new File(outputFolder + file.list()[0]);
- List<String> strings = FileUtils.readLines(file1);
-
- String[] actualLines = strings.toArray(new String[strings.size()]);
-
- String[] expectedLines = new String[]{
- "15/02/2016 10:18:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT4",
- "",
- "15/02/2016 10:19:00 +0000,15/02/2016 12:00:00 +0000,OILPAINT5",
- ""};
-
- Assert.assertEquals(expectedLines.length, actualLines.length);
- for (int i = 0;i < expectedLines.length; i++) {
- Assert.assertEquals(expectedLines[i], actualLines[i]);
- }
- }
-
- public static boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException
- {
- boolean result;
- long now = System.currentTimeMillis();
- Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath());
- try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) {
- List<String> strings = Lists.newArrayList();
- while (System.currentTimeMillis() - now < timeout) {
- if (fs.exists(outDir)) {
- File file = new File(outputFolder);
- if (file.list().length > 0) {
- File file1 = new File(outputFolder + file.list()[0]);
- strings = FileUtils.readLines(file1);
- if (strings.size() != 0) {
- break;
- }
- }
- }
-
- Thread.sleep(500);
- }
-
- result = fs.exists(outDir) && (strings.size() != 0);
- }
-
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
----------------------------------------------------------------------
diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
deleted file mode 100644
index 6b1a404..0000000
--- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithAPITest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.sql.sample;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.TimeZone;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.Collections2;
-
-import com.datatorrent.api.LocalMode;
-
-
-public class SQLApplicationWithAPITest
-{
- private TimeZone defaultTZ;
-
- @Before
- public void setUp() throws Exception
- {
- defaultTZ = TimeZone.getDefault();
- TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
- }
-
- @After
- public void tearDown() throws Exception
- {
- TimeZone.setDefault(defaultTZ);
- }
-
- @Test
- public void test() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithAPI.xml"));
-
- SQLApplicationWithAPI app = new SQLApplicationWithAPI();
-
- lma.prepareDAG(app, conf);
-
- LocalMode.Controller lc = lma.getController();
-
- PrintStream originalSysout = System.out;
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- System.setOut(new PrintStream(baos));
-
- lc.runAsync();
- SQLApplicationWithModelFileTest.waitTillStdoutIsPopulated(baos, 30000);
- lc.shutdown();
-
- System.setOut(originalSysout);
-
- String[] sout = baos.toString().split(System.lineSeparator());
- Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
-
- String[] actualLines = filter.toArray(new String[filter.size()]);
- Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
- Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
- Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
- Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
- Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
- Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
- }
-}