You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/27 15:19:12 UTC

[11/19] apex-malhar git commit: Add example of custom partitioner and stream codec Update index and revise some README files

Add example of custom partitioner and stream codec
Update index and revise some README files


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f7c7b7cf
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f7c7b7cf
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f7c7b7cf

Branch: refs/heads/master
Commit: f7c7b7cf476e0fe7ee82ffd85a2ed14a82cbcaed
Parents: 041af06
Author: Munagala V. Ramanath <ra...@datatorrent.com>
Authored: Sun Mar 13 17:37:45 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun Mar 26 11:43:48 2017 -0700

----------------------------------------------------------------------
 examples/partition/README.md                    |  12 +
 .../partition/XmlJavadocCommentsExtractor.xsl   |  44 +++
 examples/partition/pom.xml                      | 274 +++++++++++++++++++
 examples/partition/src/assemble/appPackage.xml  |  43 +++
 .../java/com/example/myapexapp/Application.java |  27 ++
 .../main/java/com/example/myapexapp/Codec3.java |  13 +
 .../myapexapp/RandomNumberGenerator.java        |  83 ++++++
 .../com/example/myapexapp/TestPartition.java    | 164 +++++++++++
 .../src/main/resources/META-INF/properties.xml  |  33 +++
 .../src/main/resources/my-log4j.properties      |  16 ++
 .../com/example/myapexapp/ApplicationTest.java  |  37 +++
 .../src/test/resources/log4j.properties         |  21 ++
 12 files changed, 767 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/README.md
----------------------------------------------------------------------
diff --git a/examples/partition/README.md b/examples/partition/README.md
new file mode 100644
index 0000000..6a46599
--- /dev/null
+++ b/examples/partition/README.md
@@ -0,0 +1,12 @@
+This example shows how to define custom partitions and a custom `StreamCodec` to customize
+the set of tuples that reach each partition.
+
+There are two operators: `RandomNumberGenerator` (generates random integers) and
+`TestPartition` (logs input tuples).
+
+The application also uses a StreamCodec called `Codec3` to tag each tuple with a
+partition tag based on whether the number is divisible by 2 or 4.
+
+`TestPartition` has code to create 3 partitions: one gets odd numbers, one gets multiples
+of 4 and the last gets the rest. The `PartitionKeys` associated with each partition use
+the partition tag to select the set of tuples to be handled by that partition.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/partition/XmlJavadocCommentsExtractor.xsl b/examples/partition/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/partition/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/pom.xml
----------------------------------------------------------------------
diff --git a/examples/partition/pom.xml b/examples/partition/pom.xml
new file mode 100644
index 0000000..ac15981
--- /dev/null
+++ b/examples/partition/pom.xml
@@ -0,0 +1,274 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <groupId>com.example</groupId>
+  <version>1.0-SNAPSHOT</version>
+  <artifactId>Test-Ram</artifactId>
+  <packaging>jar</packaging>
+
+  <!-- change these to the appropriate values -->
+  <name>Test_ram</name>
+  <description>Test_ram</description>
+
+  <properties>
+    <!-- change this if you desire to use a different version of Apex core -->
+    <apex.version>3.5.0</apex.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+  </properties>
+
+  <build>
+    <plugins>
+       <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
+         <artifactId>maven-eclipse-plugin</artifactId>
+         <version>2.9</version>
+         <configuration>
+           <downloadSources>true</downloadSources>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-compiler-plugin</artifactId>
+         <version>3.3</version>
+         <configuration>
+           <encoding>UTF-8</encoding>
+           <source>1.7</source>
+           <target>1.7</target>
+           <debug>true</debug>
+           <optimize>false</optimize>
+           <showDeprecation>true</showDeprecation>
+           <showWarnings>true</showWarnings>
+         </configuration>
+       </plugin>
+       <plugin>
+         <artifactId>maven-dependency-plugin</artifactId>
+         <version>2.8</version>
+         <executions>
+           <execution>
+             <id>copy-dependencies</id>
+             <phase>prepare-package</phase>
+             <goals>
+               <goal>copy-dependencies</goal>
+             </goals>
+             <configuration>
+               <outputDirectory>target/deps</outputDirectory>
+               <includeScope>runtime</includeScope>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+
+       <plugin>
+         <artifactId>maven-assembly-plugin</artifactId>
+         <executions>
+           <execution>
+             <id>app-package-assembly</id>
+             <phase>package</phase>
+             <goals>
+               <goal>single</goal>
+             </goals>
+             <configuration>
+               <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+               <appendAssemblyId>false</appendAssemblyId>
+               <descriptors>
+                 <descriptor>src/assemble/appPackage.xml</descriptor>
+               </descriptors>
+               <archiverConfig>
+                 <defaultDirectoryMode>0755</defaultDirectoryMode>
+               </archiverConfig>                  
+               <archive>
+                 <manifestEntries>
+                   <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                   <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                   <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                   <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                   <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+                   <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                 </manifestEntries>
+               </archive>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+
+       <plugin>
+         <artifactId>maven-antrun-plugin</artifactId>
+         <version>1.7</version>
+         <executions>
+           <execution>
+             <phase>package</phase>
+             <configuration>
+               <target>
+                 <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+                       tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+               </target>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+           <execution>
+             <!-- create resource directory for xml javadoc-->
+             <id>createJavadocDirectory</id>
+             <phase>generate-resources</phase>
+             <configuration>
+               <tasks>
+                 <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                 <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+               </tasks>
+             </configuration>
+             <goals>
+               <goal>run</goal>
+             </goals>
+           </execution>
+         </executions>
+       </plugin>
+
+       <plugin>
+         <groupId>org.codehaus.mojo</groupId>
+         <artifactId>build-helper-maven-plugin</artifactId>
+         <version>1.9.1</version>
+         <executions>
+           <execution>
+             <id>attach-artifacts</id>
+             <phase>package</phase>
+             <goals>
+               <goal>attach-artifact</goal>
+             </goals>
+             <configuration>
+               <artifacts>
+                 <artifact>
+                   <file>target/${project.artifactId}-${project.version}.apa</file>
+                   <type>apa</type>
+                 </artifact>
+               </artifacts>
+               <skipAttach>false</skipAttach>
+             </configuration>
+           </execution>
+         </executions>
+       </plugin>
+
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+              <includes>
+                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+
+  </build>
+
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>3.6.0</version>
+
+      <!-- 
+           If you know that your application does not need transitive dependencies pulled in by malhar-library,
+           uncomment the following to reduce the size of your app package.
+      -->
+      <!--    
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+      -->
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/partition/src/assemble/appPackage.xml b/examples/partition/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/partition/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/Application.java
----------------------------------------------------------------------
diff --git a/examples/partition/src/main/java/com/example/myapexapp/Application.java b/examples/partition/src/main/java/com/example/myapexapp/Application.java
new file mode 100644
index 0000000..e1ca2ff
--- /dev/null
+++ b/examples/partition/src/main/java/com/example/myapexapp/Application.java
@@ -0,0 +1,27 @@
+package com.example.myapexapp;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+@ApplicationAnnotation(name="TestStuff")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomNumberGenerator random = dag.addOperator("randomInt",     RandomNumberGenerator.class);
+    TestPartition testPartition  = dag.addOperator("testPartition", TestPartition.class);
+    Codec3 codec = new Codec3();
+    dag.setInputPortAttribute(testPartition.in, PortContext.STREAM_CODEC, codec);
+
+    //Add locality if needed, e.g.: .setLocality(Locality.CONTAINER_LOCAL);
+    dag.addStream("randomData", random.out, testPartition.in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/Codec3.java
----------------------------------------------------------------------
diff --git a/examples/partition/src/main/java/com/example/myapexapp/Codec3.java b/examples/partition/src/main/java/com/example/myapexapp/Codec3.java
new file mode 100644
index 0000000..2754e9b
--- /dev/null
+++ b/examples/partition/src/main/java/com/example/myapexapp/Codec3.java
@@ -0,0 +1,13 @@
+package com.example.myapexapp;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+
+public class Codec3 extends KryoSerializableStreamCodec<Integer> {
+    @Override
+    public int getPartition(Integer tuple) {
+      final int v = tuple;
+      return (1 == (v & 1)) ? 0      // odd
+           : (0 == (v & 3)) ? 1      // divisible by 4
+           : 2;                      // divisible by 2 but not 4
+    }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
----------------------------------------------------------------------
diff --git a/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
new file mode 100644
index 0000000..de2797b
--- /dev/null
+++ b/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
@@ -0,0 +1,83 @@
+package com.example.myapexapp;
+
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.Size;
+import javax.validation.ConstraintViolation;
+import javax.validation.ValidatorFactory;
+import javax.validation.Validator;
+import javax.validation.Validation;
+
+import com.datatorrent.api.Attribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * This is a simple operator that emits random integer.
+ */
+public class RandomNumberGenerator extends BaseOperator implements InputOperator
+{
+  private static final Logger LOG = LoggerFactory.getLogger(RandomNumberGenerator.class);
+
+  @Min(1)
+  private int numTuples = 20;
+  private transient int count = 0;
+
+  private int sleepTime;
+  private transient long curWindowId;
+  private transient Random random = new Random();
+
+  public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID);
+    sleepTime = context.getValue(context.SPIN_MILLIS);
+    LOG.debug("Started setup, appWindowId = {}, sleepTime = {}", appWindowId, sleepTime);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    count = 0;
+    LOG.debug("beginWindow: windowId = {}", windowId);
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (count++ < numTuples) {
+      out.emit(random.nextInt());
+    } else {
+      LOG.debug("count = {}, time = {}", count, System.currentTimeMillis());
+
+      try {
+        // avoid repeated calls to this function
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted");
+      }
+    }
+  }
+
+  public int getNumTuples()
+  {
+    return numTuples;
+  }
+
+  public void setNumTuples(int numTuples)
+  {
+    this.numTuples = numTuples;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java
----------------------------------------------------------------------
diff --git a/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java b/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java
new file mode 100644
index 0000000..1f77e72
--- /dev/null
+++ b/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java
@@ -0,0 +1,164 @@
+package com.example.myapexapp;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.Size;
+import javax.validation.ConstraintViolation;
+import javax.validation.ValidatorFactory;
+import javax.validation.Validator;
+import javax.validation.Validation;
+
+import com.datatorrent.api.Attribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.Partitioner.PartitionKeys;
+import com.datatorrent.api.Partitioner.PartitioningContext;
+
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Simple operator to test partitioning
+ */
+public class TestPartition extends BaseOperator implements Partitioner<TestPartition>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(TestPartition.class);
+
+  private transient int id;             // operator/partition id
+  private transient long curWindowId;   // current window id
+  private transient long cnt;           // per-window tuple count
+
+  @Min(1) @Max(20)
+  private int nPartitions = 3;
+
+  public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() {
+    @Override
+    public void process(Integer tuple)
+    {
+      LOG.debug("{}: tuple = {}, operator id = {}", cnt, tuple, id);
+      ++cnt;
+    }
+  };
+
+  //public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID);
+    id = context.getId();
+    LOG.debug("Started setup, appWindowId = {}, operator id = {}", appWindowId, id);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    cnt = 0;
+    curWindowId = windowId;
+    LOG.debug("window id = {}, operator id = {}", curWindowId, id);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    LOG.debug("window id = {}, operator id = {}, cnt = {}", curWindowId, id, cnt);
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<TestPartition>> partitions)
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Collection<Partition<TestPartition>> definePartitions(
+    Collection<Partition<TestPartition>> partitions,
+    PartitioningContext context)
+  {
+    int oldSize = partitions.size();
+    LOG.debug("partitionCount: current = {} requested = {}", oldSize, nPartitions);
+
+    // each partition i in 0...nPartitions receives tuples divisible by i but not by any other
+    // j in that range; all other tuples ignored
+    //
+    if (3 != nPartitions) return getPartitions(partitions, context);
+
+    // special case of 3 partitions: All odd numbers to partition 0; even numbers divisible
+    // by 4 to partition 1, those divisible by 2 but not 4 to partition 2.
+
+    // mask used to extract discriminant from tuple hashcode
+    int mask = 0x03;
+
+    Partition<TestPartition>[] newPartitions = new Partition[] {
+      new DefaultPartition<TestPartition>(new TestPartition()),
+      new DefaultPartition<TestPartition>(new TestPartition()),
+      new DefaultPartition<TestPartition>(new TestPartition()) };
+
+    HashSet<Integer>[] set
+      = new HashSet[] {new HashSet<>(), new HashSet<>(), new HashSet<>()};
+    set[0].add(0);
+    set[1].add(1);
+    set[2].add(2);
+
+    PartitionKeys[] keys = {
+      new PartitionKeys(mask, set[0]),
+      new PartitionKeys(mask, set[1]),
+      new PartitionKeys(mask, set[2]) };
+
+    for (int i = 0; i < 3; ++i ) {
+      Partition<TestPartition> partition = newPartitions[i];
+      partition.getPartitionKeys().put(in, keys[i]);
+    }
+
+    return new ArrayList<Partition<TestPartition>>(Arrays.asList(newPartitions));
+  }  // definePartitions
+
+  private Collection<Partition<TestPartition>> getPartitions(
+    Collection<Partition<TestPartition>> partitions,
+    PartitioningContext context)
+  {
+    // create array of partitions to return
+    Collection<Partition<TestPartition>> result
+      = new ArrayList<Partition<TestPartition>>(nPartitions);
+
+    int mask = getMask(nPartitions);
+    for (int i = 0; i < nPartitions; ++i) {
+      HashSet<Integer> set = new HashSet<>();
+      set.add(i);
+      PartitionKeys keys = new PartitionKeys(mask, set);
+      Partition partition = new DefaultPartition<TestPartition>(new TestPartition());
+      partition.getPartitionKeys().put(in, keys);
+    }
+
+    return result;
+  }  // getPartitions
+
+  // return mask with bits 0..N set where N is the highest set bit of argument
+  private int getMask(final int n) {
+    return -1 >>> Integer.numberOfLeadingZeros(n);
+  }  // getMask
+
+  // accessors
+  public int getNPartitions() { return nPartitions; }
+  public void setNPartitions(int v) { nPartitions = v; }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/partition/src/main/resources/META-INF/properties.xml b/examples/partition/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..bf30603
--- /dev/null
+++ b/examples/partition/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+  <!-- memory assigned to app master
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  -->
+
+  <!-- log4j configuration for all the operators -->
+  <property>
+    <name>dt.operator.*.attr.JVM_OPTIONS</name>
+    <value> -Dlog4j.configuration=my-log4j.properties</value>
+  </property>
+
+  <property>
+    <name>dt.application.*.operator.randomInt.prop.numTuples</name>
+    <value>10</value>
+  </property>
+  <!--
+  <property>
+    <name>dt.application.TestStuff.operator.testPartition.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
+  </property>
+  -->
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/resources/my-log4j.properties
----------------------------------------------------------------------
diff --git a/examples/partition/src/main/resources/my-log4j.properties b/examples/partition/src/main/resources/my-log4j.properties
new file mode 100644
index 0000000..21ead89
--- /dev/null
+++ b/examples/partition/src/main/resources/my-log4j.properties
@@ -0,0 +1,16 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+log4j.logger.org=INFO
+log4j.logger.org.apache=INFO
+log4j.logger.com.datatorrent=INFO
+
+#log4j.logger.com.example.myapexapp.TestPartition=DEBUG, CONSOLE

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java
new file mode 100644
index 0000000..5f490d8
--- /dev/null
+++ b/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java
@@ -0,0 +1,37 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+import com.example.myapexapp.Application;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest {
+
+  @Test
+  public void testApplication() throws IOException, Exception {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      lma.prepareDAG(new Application(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(5000); // runs for 5 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/partition/src/test/resources/log4j.properties b/examples/partition/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3bfcdc5
--- /dev/null
+++ b/examples/partition/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug