You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/27 15:19:13 UTC

[12/19] apex-malhar git commit: SPOI-9079 Added sample application for transform operator

SPOI-9079 Added sample application for transform operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a492e22f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a492e22f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a492e22f

Branch: refs/heads/master
Commit: a492e22fe2a0a18fdfde53f19550a13664b97231
Parents: ca12bca
Author: chaitanya <ch...@apache.org>
Authored: Mon Oct 10 17:03:40 2016 +0530
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun Mar 26 11:43:48 2017 -0700

----------------------------------------------------------------------
 examples/transform/README.md                    |   6 +
 .../transform/XmlJavadocCommentsExtractor.xsl   |  44 +++
 examples/transform/pom.xml                      | 267 +++++++++++++++++++
 examples/transform/src/assemble/appPackage.xml  |  43 +++
 .../java/com/example/transform/Application.java |  39 +++
 .../com/example/transform/CustomerEvent.java    |  74 +++++
 .../com/example/transform/CustomerInfo.java     |  60 +++++
 .../transform/DynamicTransformApplication.java  |  52 ++++
 .../com/example/transform/POJOGenerator.java    | 125 +++++++++
 .../src/main/resources/META-INF/properties.xml  |  19 ++
 .../com/example/transform/ApplicationTest.java  |  21 ++
 .../src/test/resources/log4j.properties         |  22 ++
 12 files changed, 772 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/README.md
----------------------------------------------------------------------
diff --git a/examples/transform/README.md b/examples/transform/README.md
new file mode 100644
index 0000000..af016ff
--- /dev/null
+++ b/examples/transform/README.md
@@ -0,0 +1,6 @@
+Sample application to show how to use the TransformOperator to transform the input POJO using given expressions.
+
+Operators in sample application are as follows:
+1) POJOGenerator which generates and emits the CustomerEvent POJO.
+2) TransformOperator which transforms the input POJO using provided expressions and emits the transformed POJO.
+3) ConsoleOutputOperator which writes transformed POJO to stdout.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/transform/XmlJavadocCommentsExtractor.xsl b/examples/transform/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/transform/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/pom.xml
----------------------------------------------------------------------
diff --git a/examples/transform/pom.xml b/examples/transform/pom.xml
new file mode 100644
index 0000000..e8846d3
--- /dev/null
+++ b/examples/transform/pom.xml
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.example</groupId>
+  <version>1.0-SNAPSHOT</version>
+  <artifactId>transform</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Transform Application</name>
+  <description>Sample application for transform operator</description>
+
+  <properties>
+    <!-- change this if you desire to use a different version of Apex Core -->
+    <apex.version>3.5.0</apex.version>
+    <malhar.version>3.6.0</malhar.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+  </properties>
+
+  <build>
+    <plugins>
+       <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
+         <artifactId>maven-eclipse-plugin</artifactId>
+         <version>2.9</version>
+         <configuration>
+           <downloadSources>true</downloadSources>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-compiler-plugin</artifactId>
+         <version>3.3</version>
+         <configuration>
+           <encoding>UTF-8</encoding>
+           <source>1.7</source>
+           <target>1.7</target>
+           <debug>true</debug>
+           <optimize>false</optimize>
+           <showDeprecation>true</showDeprecation>
+           <showWarnings>true</showWarnings>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-dependency-plugin</artifactId>
+         <version>2.8</version>
+         <executions>
+           <execution>
+             <id>copy-dependencies</id>
+             <phase>prepare-package</phase>
+             <goals>
+               <goal>copy-dependencies</goal>
+             </goals>
+             <configuration>
+               <outputDirectory>target/deps</outputDirectory>
+               <includeScope>runtime</includeScope>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+
+       <plugin>
+         <artifactId>maven-assembly-plugin</artifactId>
+         <executions>
+           <execution>
+             <id>app-package-assembly</id>
+             <phase>package</phase>
+             <goals>
+               <goal>single</goal>
+             </goals>
+             <configuration>
+               <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+               <appendAssemblyId>false</appendAssemblyId>
+               <descriptors>
+                 <descriptor>src/assemble/appPackage.xml</descriptor>
+               </descriptors>
+               <archiverConfig>
+                 <defaultDirectoryMode>0755</defaultDirectoryMode>
+               </archiverConfig>
+               <archive>
+                 <manifestEntries>
+                   <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                   <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                   <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+                   <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                   <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                   <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+                   <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                 </manifestEntries>
+               </archive>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+
+       <plugin>
+         <artifactId>maven-antrun-plugin</artifactId>
+         <version>1.7</version>
+         <executions>
+           <execution>
+             <phase>package</phase>
+             <configuration>
+               <target>
+                 <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+                       tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+               </target>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+           <execution>
+             <!-- create resource directory for xml javadoc-->
+             <id>createJavadocDirectory</id>
+             <phase>generate-resources</phase>
+             <configuration>
+               <tasks>
+                 <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                 <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+               </tasks>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+         </executions>
+       </plugin>
+
+       <plugin>
+         <groupId>org.codehaus.mojo</groupId>
+         <artifactId>build-helper-maven-plugin</artifactId>
+         <version>1.9.1</version>
+         <executions>
+           <execution>
+             <id>attach-artifacts</id>
+             <phase>package</phase>
+             <goals>
+               <goal>attach-artifact</goal>
+             </goals>
+             <configuration>
+               <artifacts>
+                 <artifact>
+                   <file>target/${project.artifactId}-${project.version}.apa</file>
+                   <type>apa</type>
+                 </artifact>
+               </artifacts>
+               <skipAttach>false</skipAttach>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+              <includes>
+                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+
+  </build>
+
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${malhar.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/transform/src/assemble/appPackage.xml b/examples/transform/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/transform/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/Application.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/Application.java b/examples/transform/src/main/java/com/example/transform/Application.java
new file mode 100644
index 0000000..d73b47d
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/Application.java
@@ -0,0 +1,39 @@
+package com.example.transform;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.transform.TransformOperator;
+
+@ApplicationAnnotation(name="TransformExample")
+public class Application implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
+    TransformOperator transform = dag.addOperator("Process", new TransformOperator());
+    // Set expression map
+    Map<String, String> expMap = new HashMap<>();
+    expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
+    expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()");
+    expMap.put("address", "{$.address}.toLowerCase()");
+    transform.setExpressionMap(expMap);
+    ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+
+    dag.addStream("InputToTransform", input.output, transform.input);
+    dag.addStream("TransformToOutput", transform.output, output.input);
+
+    dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
+    dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
+    dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TransformOperator>(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/CustomerEvent.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/CustomerEvent.java b/examples/transform/src/main/java/com/example/transform/CustomerEvent.java
new file mode 100644
index 0000000..14a9c82
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/CustomerEvent.java
@@ -0,0 +1,74 @@
+package com.example.transform;
+
+import java.util.Date;
+
+public class CustomerEvent
+{
+  private int customerId;
+  private String firstName;
+  private String lastName;
+  private Date dateOfBirth;
+  private String address;
+
+  public int getCustomerId()
+  {
+    return customerId;
+  }
+
+  public void setCustomerId(int customerId)
+  {
+    this.customerId = customerId;
+  }
+
+  public String getFirstName()
+  {
+    return firstName;
+  }
+
+  public void setFirstName(String firstName)
+  {
+    this.firstName = firstName;
+  }
+
+  public String getLastName()
+  {
+    return lastName;
+  }
+
+  public void setLastName(String lastName)
+  {
+    this.lastName = lastName;
+  }
+
+  public Date getDateOfBirth()
+  {
+    return dateOfBirth;
+  }
+
+  public void setDateOfBirth(Date dateOfBirth)
+  {
+    this.dateOfBirth = dateOfBirth;
+  }
+
+  public String getAddress()
+  {
+    return address;
+  }
+
+  public void setAddress(String address)
+  {
+    this.address = address;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CustomerEvent{" +
+      "customerId=" + customerId +
+      ", firstName='" + firstName + '\'' +
+      ", lastName='" + lastName + '\'' +
+      ", dateOfBirth=" + dateOfBirth +
+      ", address='" + address + '\'' +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/CustomerInfo.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/CustomerInfo.java b/examples/transform/src/main/java/com/example/transform/CustomerInfo.java
new file mode 100644
index 0000000..28a9ccb
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/CustomerInfo.java
@@ -0,0 +1,60 @@
+package com.example.transform;
+
+public class CustomerInfo
+{
+  private int customerId;
+  private String name;
+  private int age;
+  private String address;
+
+  public int getCustomerId()
+  {
+    return customerId;
+  }
+
+  public void setCustomerId(int customerId)
+  {
+    this.customerId = customerId;
+  }
+
+  public String getName()
+  {
+    return name;
+  }
+
+  public void setName(String name)
+  {
+    this.name = name;
+  }
+
+  public int getAge()
+  {
+    return age;
+  }
+
+  public void setAge(int age)
+  {
+    this.age = age;
+  }
+
+  public String getAddress()
+  {
+    return address;
+  }
+
+  public void setAddress(String address)
+  {
+    this.address = address;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CustomerInfo{" +
+      "customerId=" + customerId +
+      ", name='" + name + '\'' +
+      ", age=" + age +
+      ", address='" + address + '\'' +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java b/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
new file mode 100644
index 0000000..a769016
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
@@ -0,0 +1,52 @@
+package com.example.transform;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+import com.datatorrent.lib.transform.TransformOperator;
+
+@ApplicationAnnotation(name="DynamicTransformApp")
+public class DynamicTransformApplication implements StreamingApplication
+{
+  private static String COOL_DOWN_MILLIS = "dt.cooldown";
+  private static String MAX_THROUGHPUT = "dt.maxThroughput";
+  private static String MIN_THROUGHPUT = "dt.minThroughput";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
+    TransformOperator transform = dag.addOperator("Process", new TransformOperator());
+    // Set expression map
+    Map<String, String> expMap = new HashMap<>();
+    expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
+    expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()");
+    expMap.put("address", "{$.address}.toLowerCase()");
+    transform.setExpressionMap(expMap);
+    ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+
+    dag.addStream("InputToTransform", input.output, transform.input);
+    dag.addStream("TransformToOutput", transform.output, output.input);
+
+    dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
+    dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
+
+    StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new StatelessThroughputBasedPartitioner<>();
+    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+    dag.setAttribute(transform, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+    dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, partitioner);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/java/com/example/transform/POJOGenerator.java b/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
new file mode 100644
index 0000000..9db5fd1
--- /dev/null
+++ b/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
@@ -0,0 +1,125 @@
+package com.example.transform;
+
+import java.util.Date;
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * Generates and emits the CustomerEvent
+ */
+public class POJOGenerator implements InputOperator
+{
+  @Min(1)
+  private int maxCustomerId = 100000;
+  @Min(1)
+  private int maxNameLength = 10;
+  @Min(1)
+  private int maxAddressLength = 15;
+  private long tuplesCounter;
+  // Limit number of emitted tuples per window
+  @Min(1)
+  private long maxTuplesPerWindow = 100;
+  private final Random random = new Random();
+  private final RandomStringUtils rRandom = new RandomStringUtils();
+  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    tuplesCounter = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  CustomerEvent generateCustomersEvent() throws Exception {
+
+    CustomerEvent customerEvent = new CustomerEvent();
+    customerEvent.setCustomerId(randomId(maxCustomerId));
+    customerEvent.setFirstName(rRandom.randomAlphabetic(randomId(maxNameLength)));
+    customerEvent.setLastName(rRandom.randomAlphabetic(randomId(maxNameLength)));
+    long val = random.nextLong();
+    long diff1 = val % System.currentTimeMillis();
+    customerEvent.setDateOfBirth(new Date(diff1));
+    customerEvent.setAddress(rRandom.randomAlphabetic(randomId(maxAddressLength)));
+    return customerEvent;
+  }
+
+  private int randomId(int max) {
+    if (max < 1) return 1;
+    return 1 + random.nextInt(max);
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    while (tuplesCounter++ < maxTuplesPerWindow) {
+      try {
+        CustomerEvent event = generateCustomersEvent();
+        this.output.emit(event);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  public long getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  public int getMaxAddressLength()
+  {
+    return maxAddressLength;
+  }
+
+  public void setMaxAddressLength(int maxAddressLength)
+  {
+    this.maxAddressLength = maxAddressLength;
+  }
+
+  public int getMaxNameLength()
+  {
+    return maxNameLength;
+  }
+
+  public void setMaxNameLength(int maxNameLength)
+  {
+    this.maxNameLength = maxNameLength;
+  }
+
+  public int getMaxCustomerId()
+  {
+    return maxCustomerId;
+  }
+
+  public void setMaxCustomerId(int maxCustomerId)
+  {
+    this.maxCustomerId = maxCustomerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/transform/src/main/resources/META-INF/properties.xml b/examples/transform/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..f885664
--- /dev/null
+++ b/examples/transform/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0"?>
+<configuration>
+  <property>
+    <name>dt.application.*.operator.transform.attr.MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  <property>
+  <name>dt.maxThroughput</name>
+  <value>150</value>
+  </property>
+  <property>
+    <name>dt.minThroughput</name>
+    <value>50</value>
+  </property>
+  <property>
+    <name>dt.cooldown</name>
+    <value>1800000</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/transform/src/test/java/com/example/transform/ApplicationTest.java b/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
new file mode 100644
index 0000000..2d331d2
--- /dev/null
+++ b/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
@@ -0,0 +1,21 @@
+package com.example.transform;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+public class ApplicationTest
+{
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    Thread.sleep(10 * 1000);
+    lc.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a492e22f/examples/transform/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/transform/src/test/resources/log4j.properties b/examples/transform/src/test/resources/log4j.properties
new file mode 100644
index 0000000..98544e8
--- /dev/null
+++ b/examples/transform/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
+log4j.logger.org=info