You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/27 15:19:13 UTC
[12/19] apex-malhar git commit: SPOI-9079 Added sample application
for transform operator
SPOI-9079 Added sample application for transform operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a492e22f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a492e22f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a492e22f
Branch: refs/heads/master
Commit: a492e22fe2a0a18fdfde53f19550a13664b97231
Parents: ca12bca
Author: chaitanya <ch...@apache.org>
Authored: Mon Oct 10 17:03:40 2016 +0530
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun Mar 26 11:43:48 2017 -0700
----------------------------------------------------------------------
examples/transform/README.md | 6 +
.../transform/XmlJavadocCommentsExtractor.xsl | 44 +++
examples/transform/pom.xml | 267 +++++++++++++++++++
examples/transform/src/assemble/appPackage.xml | 43 +++
.../java/com/example/transform/Application.java | 39 +++
.../com/example/transform/CustomerEvent.java | 74 +++++
.../com/example/transform/CustomerInfo.java | 60 +++++
.../transform/DynamicTransformApplication.java | 52 ++++
.../com/example/transform/POJOGenerator.java | 125 +++++++++
.../src/main/resources/META-INF/properties.xml | 19 ++
.../com/example/transform/ApplicationTest.java | 21 ++
.../src/test/resources/log4j.properties | 22 ++
12 files changed, 772 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/README.md
----------------------------------------------------------------------
diff --git a/examples/transform/README.md b/examples/transform/README.md
new file mode 100644
index 0000000..af016ff
--- /dev/null
+++ b/examples/transform/README.md
@@ -0,0 +1,6 @@
+Sample application to show how to use the TransformOperator to transform the input POJO using given expressions.
+
+Operators in sample application are as follows:
+1) POJOGenerator which generates and emits the CustomerEvent POJO.
+2) TransformOperator which transforms the input POJO using provided expressions and emits the transformed POJO.
+3) ConsoleOutputOperator which writes transformed POJO to stdout.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/transform/XmlJavadocCommentsExtractor.xsl b/examples/transform/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/transform/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<!--
+ Document : XmlJavadocCommentsExtractor.xsl
+ Created on : September 16, 2014, 11:30 AM
+ Description:
+ The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+ <xsl:output method="xml" standalone="yes"/>
+
+ <!-- copy xml by selecting only the following nodes, attributes and text -->
+ <xsl:template match="node()|text()|@*">
+ <xsl:copy>
+ <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+ </xsl:copy>
+ </xsl:template>
+
+ <!-- Strip off the following paths from the selected xml -->
+ <xsl:template match="//root/package/interface/interface
+ |//root/package/interface/method/@qualified
+ |//root/package/class/interface
+ |//root/package/class/class
+ |//root/package/class/method/@qualified
+ |//root/package/class/field/@qualified" />
+
+ <xsl:strip-space elements="*"/>
+</xsl:stylesheet>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/pom.xml
----------------------------------------------------------------------
diff --git a/examples/transform/pom.xml b/examples/transform/pom.xml
new file mode 100644
index 0000000..e8846d3
--- /dev/null
+++ b/examples/transform/pom.xml
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.example</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <artifactId>transform</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Transform Application</name>
+ <description>Sample application for transform operator</description>
+
+ <properties>
+ <!-- change this if you desire to use a different version of Apex Core -->
+ <apex.version>3.5.0</apex.version>
+ <malhar.version>3.6.0</malhar.version>
+ <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ <source>1.7</source>
+ <target>1.7</target>
+ <debug>true</debug>
+ <optimize>false</optimize>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/deps</outputDirectory>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>app-package-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptors>
+ <descriptor>src/assemble/appPackage.xml</descriptor>
+ </descriptors>
+ <archiverConfig>
+ <defaultDirectoryMode>0755</defaultDirectoryMode>
+ </archiverConfig>
+ <archive>
+ <manifestEntries>
+ <Class-Path>${apex.apppackage.classpath}</Class-Path>
+ <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+ <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+ <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+ <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+ <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+ <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+ tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ <execution>
+ <!-- create resource directory for xml javadoc-->
+ <id>createJavadocDirectory</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <tasks>
+ <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+ <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${project.artifactId}-${project.version}.apa</file>
+ <type>apa</type>
+ </artifact>
+ </artifacts>
+ <skipAttach>false</skipAttach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- generate javdoc -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <!-- generate xml javadoc -->
+ <execution>
+ <id>xml-doclet</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>javadoc</goal>
+ </goals>
+ <configuration>
+ <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+ <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+ <useStandardDocletOptions>false</useStandardDocletOptions>
+ <docletArtifact>
+ <groupId>com.github.markusbernhardt</groupId>
+ <artifactId>xml-doclet</artifactId>
+ <version>1.0.4</version>
+ </docletArtifact>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>xml-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>transform-xmljavadoc</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>transform</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <transformationSets>
+ <transformationSet>
+ <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+ <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+ </transformationSet>
+ </transformationSets>
+ </configuration>
+ </plugin>
+ <!-- copy xml javadoc to class jar -->
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ </build>
+
+ <dependencies>
+ <!-- add your dependencies here -->
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${malhar.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/transform/src/assemble/appPackage.xml b/examples/transform/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/transform/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/resources</directory>
+ <outputDirectory>/resources</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/Application.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/Application.java b/examples/transform/src/main/java/com/example/transform/Application.java
new file mode 100644
index 0000000..d73b47d
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/Application.java
@@ -0,0 +1,39 @@
+package com.example.transform;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.transform.TransformOperator;
+
+@ApplicationAnnotation(name="TransformExample")
+public class Application implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
+ TransformOperator transform = dag.addOperator("Process", new TransformOperator());
+ // Set expression map
+ Map<String, String> expMap = new HashMap<>();
+ expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
+ expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()");
+ expMap.put("address", "{$.address}.toLowerCase()");
+ transform.setExpressionMap(expMap);
+ ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+
+ dag.addStream("InputToTransform", input.output, transform.input);
+ dag.addStream("TransformToOutput", transform.output, output.input);
+
+ dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
+ dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
+ dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TransformOperator>(2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/CustomerEvent.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/CustomerEvent.java b/examples/transform/src/main/java/com/example/transform/CustomerEvent.java
new file mode 100644
index 0000000..14a9c82
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/CustomerEvent.java
@@ -0,0 +1,74 @@
+package com.example.transform;
+
+import java.util.Date;
+
+public class CustomerEvent
+{
+ private int customerId;
+ private String firstName;
+ private String lastName;
+ private Date dateOfBirth;
+ private String address;
+
+ public int getCustomerId()
+ {
+ return customerId;
+ }
+
+ public void setCustomerId(int customerId)
+ {
+ this.customerId = customerId;
+ }
+
+ public String getFirstName()
+ {
+ return firstName;
+ }
+
+ public void setFirstName(String firstName)
+ {
+ this.firstName = firstName;
+ }
+
+ public String getLastName()
+ {
+ return lastName;
+ }
+
+ public void setLastName(String lastName)
+ {
+ this.lastName = lastName;
+ }
+
+ public Date getDateOfBirth()
+ {
+ return dateOfBirth;
+ }
+
+ public void setDateOfBirth(Date dateOfBirth)
+ {
+ this.dateOfBirth = dateOfBirth;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public void setAddress(String address)
+ {
+ this.address = address;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CustomerEvent{" +
+ "customerId=" + customerId +
+ ", firstName='" + firstName + '\'' +
+ ", lastName='" + lastName + '\'' +
+ ", dateOfBirth=" + dateOfBirth +
+ ", address='" + address + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/CustomerInfo.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/CustomerInfo.java b/examples/transform/src/main/java/com/example/transform/CustomerInfo.java
new file mode 100644
index 0000000..28a9ccb
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/CustomerInfo.java
@@ -0,0 +1,60 @@
+package com.example.transform;
+
+public class CustomerInfo
+{
+ private int customerId;
+ private String name;
+ private int age;
+ private String address;
+
+ public int getCustomerId()
+ {
+ return customerId;
+ }
+
+ public void setCustomerId(int customerId)
+ {
+ this.customerId = customerId;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public int getAge()
+ {
+ return age;
+ }
+
+ public void setAge(int age)
+ {
+ this.age = age;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public void setAddress(String address)
+ {
+ this.address = address;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CustomerInfo{" +
+ "customerId=" + customerId +
+ ", name='" + name + '\'' +
+ ", age=" + age +
+ ", address='" + address + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java b/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
new file mode 100644
index 0000000..a769016
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
@@ -0,0 +1,52 @@
+package com.example.transform;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+import com.datatorrent.lib.transform.TransformOperator;
+
+@ApplicationAnnotation(name="DynamicTransformApp")
+public class DynamicTransformApplication implements StreamingApplication
+{
+ private static String COOL_DOWN_MILLIS = "dt.cooldown";
+ private static String MAX_THROUGHPUT = "dt.maxThroughput";
+ private static String MIN_THROUGHPUT = "dt.minThroughput";
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
+ TransformOperator transform = dag.addOperator("Process", new TransformOperator());
+ // Set expression map
+ Map<String, String> expMap = new HashMap<>();
+ expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
+ expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()");
+ expMap.put("address", "{$.address}.toLowerCase()");
+ transform.setExpressionMap(expMap);
+ ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+
+ dag.addStream("InputToTransform", input.output, transform.input);
+ dag.addStream("TransformToOutput", transform.output, output.input);
+
+ dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
+ dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
+
+ StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new StatelessThroughputBasedPartitioner<>();
+ partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+ partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+ partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+ dag.setAttribute(transform, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+ dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, partitioner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/POJOGenerator.java b/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
new file mode 100644
index 0000000..9db5fd1
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
@@ -0,0 +1,125 @@
+package com.example.transform;
+
+import java.util.Date;
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * Generates and emits the CustomerEvent
+ */
+public class POJOGenerator implements InputOperator
+{
+ @Min(1)
+ private int maxCustomerId = 100000;
+ @Min(1)
+ private int maxNameLength = 10;
+ @Min(1)
+ private int maxAddressLength = 15;
+ private long tuplesCounter;
+ // Limit number of emitted tuples per window
+ @Min(1)
+ private long maxTuplesPerWindow = 100;
+ private final Random random = new Random();
+ private final RandomStringUtils rRandom = new RandomStringUtils();
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ tuplesCounter = 0;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ CustomerEvent generateCustomersEvent() throws Exception {
+
+ CustomerEvent customerEvent = new CustomerEvent();
+ customerEvent.setCustomerId(randomId(maxCustomerId));
+ customerEvent.setFirstName(rRandom.randomAlphabetic(randomId(maxNameLength)));
+ customerEvent.setLastName(rRandom.randomAlphabetic(randomId(maxNameLength)));
+ long val = random.nextLong();
+ long diff1 = val % System.currentTimeMillis();
+ customerEvent.setDateOfBirth(new Date(diff1));
+ customerEvent.setAddress(rRandom.randomAlphabetic(randomId(maxAddressLength)));
+ return customerEvent;
+ }
+
+ private int randomId(int max) {
+ if (max < 1) return 1;
+ return 1 + random.nextInt(max);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (tuplesCounter++ < maxTuplesPerWindow) {
+ try {
+ CustomerEvent event = generateCustomersEvent();
+ this.output.emit(event);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ public long getMaxTuplesPerWindow()
+ {
+ return maxTuplesPerWindow;
+ }
+
+ public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+ {
+ this.maxTuplesPerWindow = maxTuplesPerWindow;
+ }
+
+ public int getMaxAddressLength()
+ {
+ return maxAddressLength;
+ }
+
+ public void setMaxAddressLength(int maxAddressLength)
+ {
+ this.maxAddressLength = maxAddressLength;
+ }
+
+ public int getMaxNameLength()
+ {
+ return maxNameLength;
+ }
+
+ public void setMaxNameLength(int maxNameLength)
+ {
+ this.maxNameLength = maxNameLength;
+ }
+
+ public int getMaxCustomerId()
+ {
+ return maxCustomerId;
+ }
+
+ public void setMaxCustomerId(int maxCustomerId)
+ {
+ this.maxCustomerId = maxCustomerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/resources/META-INF/properties.xml b/examples/transform/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..f885664
--- /dev/null
+++ b/examples/transform/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0"?>
+<configuration>
+ <property>
+ <name>dt.application.*.operator.transform.attr.MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+ <property>
+ <name>dt.maxThroughput</name>
+ <value>150</value>
+ </property>
+ <property>
+ <name>dt.minThroughput</name>
+ <value>50</value>
+ </property>
+ <property>
+ <name>dt.cooldown</name>
+ <value>1800000</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/test/java/com/example/transform/ApplicationTest.java b/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
new file mode 100644
index 0000000..2d331d2
--- /dev/null
+++ b/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
@@ -0,0 +1,21 @@
+package com.example.transform;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+public class ApplicationTest
+{
+ @Test
+ public void testApplication() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ Thread.sleep(10 * 1000);
+ lc.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/transform/src/test/resources/log4j.properties b/examples/transform/src/test/resources/log4j.properties
new file mode 100644
index 0000000..98544e8
--- /dev/null
+++ b/examples/transform/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
+log4j.logger.org=info