You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/27 15:19:07 UTC

[06/19] apex-malhar git commit: SPOI-8793 Added formatter example description Rename Readme.md to README.md Updated csvformatter schema definition removed properties and files that are not needed for csvformatter Updated review comments for csvformatter

SPOI-8793
Added formatter example description
Rename Readme.md to README.md
Updated csvformatter schema definition
removed properties and files that are not needed for csvformatter
Updated review comments for csvformatter example
Updated the example with Review comments
Updated csvformatter example with PR comments
changed the name from formatter to csvformatter
Incorporated review comments
Updated the format of the Readme file
changed the input JSON generation to static values and updated the test case
updated the readme format


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/63df474b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/63df474b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/63df474b

Branch: refs/heads/master
Commit: 63df474b19283ab1cbd00d9e1aecbed7a7827baa
Parents: be9f3c8
Author: venkateshDT <ve...@datatorrent.com>
Authored: Wed Aug 17 22:10:09 2016 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun Mar 26 11:43:48 2017 -0700

----------------------------------------------------------------------
 examples/csvformatter/README.md                 |  64 ++++
 .../XmlJavadocCommentsExtractor.xsl             |  44 +++
 examples/csvformatter/pom.xml                   | 297 +++++++++++++++++++
 .../csvformatter/src/assemble/appPackage.xml    |  43 +++
 .../java/com/demo/myapexapp/Application.java    |  45 +++
 .../com/demo/myapexapp/HDFSOutputOperator.java  |  87 ++++++
 .../java/com/demo/myapexapp/JsonGenerator.java  |  78 +++++
 .../main/java/com/demo/myapexapp/PojoEvent.java | 141 +++++++++
 .../src/main/resources/META-INF/properties.xml  |  40 +++
 .../csvformatter/src/main/resources/schema.json |  60 ++++
 .../com/demo/myapexapp/ApplicationTest.java     |  67 +++++
 .../src/test/resources/log4j.properties         |  22 ++
 12 files changed, 988 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/README.md
----------------------------------------------------------------------
diff --git a/examples/csvformatter/README.md b/examples/csvformatter/README.md
new file mode 100644
index 0000000..332818c
--- /dev/null
+++ b/examples/csvformatter/README.md
@@ -0,0 +1,64 @@
+## CsvFormatter Example
+
+`CsvFormatter` converts the incoming POJO to CSV(by default) or custom delimiter('|', ':' etc.) separated string.  
+
+Accordingly, we have additional config file for formatting preferences (`/src/main/resources/schema.json`) besides the common properties file (`/src/main/resources/META-INF/properties.xml`). 
+
+Users can choose the application and additional configuration file to use during launch time. In this example, we use the files mentioned above to customise the schema and configure the operator properties.
+
+
+#### **Update Properties from properties.xml - This is needed to run the example:**
+
+- Update these common properties in the file `/src/main/resources/META-INF/properties.xml`:
+
+| Property Name  | Description |
+| -------------  | ----------- |
+| dt.application.CustomOutputFormatter.operator.jsonParser.prop.sleepTime | sleep time for the container |
+| dt.application.CustomOutputFormatter.operator.jsonParser.port.out.attr.TUPLE_CLASS | expected POJO object output of the JSONParser operator |
+| dt.application.CustomOutputFormatter.operator.HDFSOutputOperator.prop.filePath | output file path for the records after formatting |
+| dt.application.CustomOutputFormatter.operator.HDFSOutputOperator.prop.outFileName | output file name for the records to be written after formatting |
+
+
+#### **Update Properties from Application.java - This is needed to customise Formatter:**
+
+```
+    CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class);
+    formatter.setSchema(SchemaUtils.jarResourceFileToString(filename));
+    dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class); 
+```
+
+'filename' above is the variable for storing schema file in the example.
+
+The input port attribute can be configured using properties.xml by setting the below property as well.
+
+`dt.application.CustomOutputFormatter.operator.formatter.port.input.attr.TUPLE_CLASS`
+
+
+
+
+#### **Sample Run:**
+
+- This example generates the JsonData, inputs the generated data to JsonParser which creates the POJO that will be passed to CsvFormatter operator and the formatted output is written to a hdfs location.
+
+- Configure JsonGenerator.java to generate needed data and define the schema of the POJO class PojoEvent.java which is input to the CsvFormatter. 
+  
+- The schema defined in the `/src/main/resources/schema.json` will be used to evaluate the fields from the object in the CsvFormatter, the field name should match with the field name from the schema.json string.
+
+- The formatting order depends on the order defined in the schema.json string.
+  
+- You can build the project and run the example as it is once you configure properties.xml. You can also customise the same app to the schema needed by configuring the PojoEvent.java and schema.json.
+
+
+#### **Sample Output:**
+
+- After running successfully, verify that the hdfs files has the similar output : 
+
+```	
+   1234|SimpleCsvFormatterExample|10000.0|false|APEX
+   1234|SimpleCsvFormatterExample|10000.0|false|APEX
+   1234|SimpleCsvFormatterExample|10000.0|false|APEX
+   1234|SimpleCsvFormatterExample|10000.0|false|APEX
+   1234|SimpleCsvFormatterExample|10000.0|false|APEX
+   ```
+
+In case you have issues configuring the operator or running the application, please send an email to users@apache.apex.org.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/examples/csvformatter/XmlJavadocCommentsExtractor.xsl b/examples/csvformatter/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/csvformatter/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/csvformatter/pom.xml b/examples/csvformatter/pom.xml
new file mode 100644
index 0000000..9033db5
--- /dev/null
+++ b/examples/csvformatter/pom.xml
@@ -0,0 +1,297 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.example</groupId>
+    <version>1.0-SNAPSHOT</version>
+    <artifactId>formatter</artifactId>
+    <packaging>jar</packaging>
+
+    <!-- change these to the appropriate values -->
+    <name>Formatter Apps</name>
+    <description>Applications to showcase different formatters</description>
+
+    <properties>
+        <!-- change this if you desire to use a different version of Apex Core -->
+        <apex.version>3.5.0</apex.version>
+        <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+        <malhar.version>3.6.0</malhar.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-eclipse-plugin</artifactId>
+                <version>2.9</version>
+                <configuration>
+                    <downloadSources>true</downloadSources>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.3</version>
+                <configuration>
+                    <encoding>UTF-8</encoding>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                    <debug>true</debug>
+                    <optimize>false</optimize>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.8</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>prepare-package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>target/deps</outputDirectory>
+                            <includeScope>runtime</includeScope>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>app-package-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+                            <appendAssemblyId>false</appendAssemblyId>
+                            <descriptors>
+                                <descriptor>src/assemble/appPackage.xml</descriptor>
+                            </descriptors>
+                            <archiverConfig>
+                                <defaultDirectoryMode>0755</defaultDirectoryMode>
+                            </archiverConfig>
+                            <archive>
+                                <manifestEntries>
+                                    <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                                    <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                                    <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+                                    <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                                    <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                                    <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+                                    <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                                </manifestEntries>
+                            </archive>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.7</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <configuration>
+                            <target>
+                                <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+                                      tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+                            </target>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <!-- create resource directory for xml javadoc-->
+                        <id>createJavadocDirectory</id>
+                        <phase>generate-resources</phase>
+                        <configuration>
+                            <tasks>
+                                <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                                <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.9.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>target/${project.artifactId}-${project.version}.apa</file>
+                                    <type>apa</type>
+                                </artifact>
+                            </artifacts>
+                            <skipAttach>false</skipAttach>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- generate javdoc -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <executions>
+                    <!-- generate xml javadoc -->
+                    <execution>
+                        <id>xml-doclet</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>javadoc</goal>
+                        </goals>
+                        <configuration>
+                            <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+                            <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+                            <useStandardDocletOptions>false</useStandardDocletOptions>
+                            <docletArtifact>
+                                <groupId>com.github.markusbernhardt</groupId>
+                                <artifactId>xml-doclet</artifactId>
+                                <version>1.0.4</version>
+                            </docletArtifact>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags-->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>xml-maven-plugin</artifactId>
+                <version>1.0</version>
+                <executions>
+                    <execution>
+                        <id>transform-xmljavadoc</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>transform</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <transformationSets>
+                        <transformationSet>
+                            <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+                            <includes>
+                                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                            </includes>
+                            <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+                            <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+                        </transformationSet>
+                    </transformationSets>
+                </configuration>
+            </plugin>
+            <!-- copy xml javadoc to class jar -->
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>2.6</version>
+                <executions>
+                    <execution>
+                        <id>copy-resources</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>copy-resources</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${basedir}/target/classes</outputDirectory>
+                            <resources>
+                                <resource>
+                                    <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                                    <includes>
+                                        <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                                    </includes>
+                                    <filtering>true</filtering>
+                                </resource>
+                            </resources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+
+    </build>
+
+    <dependencies>
+        <!-- add your dependencies here -->
+        <dependency>
+            <groupId>org.apache.apex</groupId>
+            <artifactId>malhar-library</artifactId>
+            <version>${malhar.version}</version>
+            <!--
+                 If you know that your application does not need transitive dependencies pulled in by malhar-library,
+                 uncomment the following to reduce the size of your app package.
+            -->
+            <!--
+            <exclusions>
+              <exclusion>
+                <groupId>*</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+            </exclusions>
+            -->
+        </dependency>
+        <dependency>
+            <groupId>org.apache.apex</groupId>
+            <artifactId>malhar-contrib</artifactId>
+            <version>${malhar.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.5.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>2.0.1</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.apex</groupId>
+            <artifactId>apex-common</artifactId>
+            <version>${apex.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.10</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.apex</groupId>
+            <artifactId>apex-engine</artifactId>
+            <version>${apex.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.supercsv</groupId>
+            <artifactId>super-csv</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/assemble/appPackage.xml b/examples/csvformatter/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/csvformatter/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java
new file mode 100644
index 0000000..a4ff06f
--- /dev/null
+++ b/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java
@@ -0,0 +1,45 @@
+package com.demo.myapexapp;
+
+import java.util.Arrays;
+
+import com.datatorrent.contrib.parser.JsonParser;
+
+import org.apache.apex.malhar.contrib.parser.StreamingJsonParser;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+
+@ApplicationAnnotation(name = "CustomOutputFormatter")
+public class Application implements StreamingApplication
+{
+  //Set the delimiters and schema structure  for the custom output in schema.json
+  private static final String filename = "schema.json";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class);
+    JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class);
+
+    CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class);
+    formatter.setSchema(SchemaUtils.jarResourceFileToString(filename));
+    dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class);
+
+    HDFSOutputOperator<String> hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class);
+    hdfsOutput.setLineDelimiter("");
+
+    dag.addStream("parserStream", generator.out, jsonParser.in);
+    dag.addStream("formatterStream", jsonParser.out, formatter.in);
+    dag.addStream("outputStream", formatter.out, hdfsOutput.input);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java
new file mode 100644
index 0000000..5cb162c
--- /dev/null
+++ b/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java
@@ -0,0 +1,87 @@
+package com.demo.myapexapp;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+/**
+ * HDFSoutput operator with implementation to write Objects to HDFS
+ *
+ * @param <T>
+ */
+public class HDFSOutputOperator<T> extends AbstractFileOutputOperator<T>
+{
+
+  @NotNull
+  String outFileName;
+
+  //setting default value
+  String lineDelimiter = "\n";
+
+  //Switch to write the files to HDFS - set to false to diable writes 
+  private boolean writeFilesFlag = true;
+
+  int id;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    id = context.getId();
+  }
+
+  public boolean isWriteFilesFlag()
+  {
+    return writeFilesFlag;
+  }
+
+  public void setWriteFilesFlag(boolean writeFilesFlag)
+  {
+    this.writeFilesFlag = writeFilesFlag;
+  }
+
+  public String getOutFileName()
+  {
+    return outFileName;
+  }
+
+  public void setOutFileName(String outFileName)
+  {
+    this.outFileName = outFileName;
+  }
+
+  @Override
+  protected String getFileName(T tuple)
+  {
+    return getOutFileName() + id;
+  }
+
+  public String getLineDelimiter()
+  {
+    return lineDelimiter;
+  }
+
+  public void setLineDelimiter(String lineDelimiter)
+  {
+    this.lineDelimiter = lineDelimiter;
+  }
+
+  @Override
+  protected byte[] getBytesForTuple(T tuple)
+  {
+    String temp = tuple.toString().concat(String.valueOf(lineDelimiter));
+    byte[] theByteArray = temp.getBytes();
+
+    return theByteArray;
+  }
+
+  @Override
+  protected void processTuple(T tuple)
+  {
+    if (writeFilesFlag) {
+    }
+    super.processTuple(tuple);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java
new file mode 100644
index 0000000..f50f300
--- /dev/null
+++ b/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java
@@ -0,0 +1,78 @@
+package com.demo.myapexapp;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class JsonGenerator extends BaseOperator implements InputOperator
+{
+
+  private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class);
+
+  @Min(1)
+  private int numTuples = 20;
+  private transient int count = 0;
+
+  public static Random rand = new Random();
+  private int sleepTime=5;
+
+  public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>();
+
+  private static String getJson()
+  {
+
+    JSONObject obj = new JSONObject();
+    try {
+      obj.put("campaignId", 1234);
+      obj.put("campaignName", "SimpleCsvFormatterExample");
+      obj.put("campaignBudget", 10000.0);
+      obj.put("weatherTargeting", "false");
+      obj.put("securityCode", "APEX");
+    } catch (JSONException e) {
+      return null;
+    }
+    return obj.toString();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    count = 0;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (count++ < numTuples) {
+      out.emit(getJson().getBytes());
+    } else {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted");
+      }
+    }
+  }
+
+  public int getNumTuples()
+  {
+    return numTuples;
+  }
+
+  public void setNumTuples(int numTuples)
+  {
+    this.numTuples = numTuples;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java
new file mode 100644
index 0000000..8514856
--- /dev/null
+++ b/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java
@@ -0,0 +1,141 @@
+package com.demo.myapexapp;
+
+import java.util.Date;
+
+public class PojoEvent
+{
+
+  private int advId;
+  private int campaignId;
+  private String campaignName;
+  private double campaignBudget;
+  private Date startDate;
+  private Date endDate;
+  private String securityCode;
+  private boolean weatherTargeting;
+  private boolean optimized;
+  private String parentCampaign;
+  private Character weatherTargeted;
+  private String valid;
+
+  public int getAdvId()
+  {
+    return advId;
+  }
+
+  public void setAdvId(int AdId)
+  {
+    this.advId = advId;
+  }
+
+  public int getCampaignId()
+  {
+    return campaignId;
+  }
+
+  public void setCampaignId(int campaignId)
+  {
+    this.campaignId = campaignId;
+  }
+
+  public String getCampaignName()
+  {
+    return campaignName;
+  }
+
+  public void setCampaignName(String campaignName)
+  {
+    this.campaignName = campaignName;
+  }
+
+  public double getCampaignBudget()
+  {
+    return campaignBudget;
+  }
+
+  public void setCampaignBudget(double campaignBudget)
+  {
+    this.campaignBudget = campaignBudget;
+  }
+
+  public Date getStartDate()
+  {
+    return startDate;
+  }
+
+  public void setStartDate(Date startDate)
+  {
+    this.startDate = startDate;
+  }
+
+  public Date getEndDate()
+  {
+    return endDate;
+  }
+
+  public void setEndDate(Date endDate)
+  {
+    this.endDate = endDate;
+  }
+
+  public String getSecurityCode()
+  {
+    return securityCode;
+  }
+
+  public void setSecurityCode(String securityCode)
+  {
+    this.securityCode = securityCode;
+  }
+
+  public boolean isWeatherTargeting()
+  {
+    return weatherTargeting;
+  }
+
+  public void setWeatherTargeting(boolean weatherTargeting)
+  {
+    this.weatherTargeting = weatherTargeting;
+  }
+
+  public boolean isOptimized()
+  {
+    return optimized;
+  }
+
+  public void setOptimized(boolean optimized)
+  {
+    this.optimized = optimized;
+  }
+
+  public String getParentCampaign()
+  {
+    return parentCampaign;
+  }
+
+  public void setParentCampaign(String parentCampaign)
+  {
+    this.parentCampaign = parentCampaign;
+  }
+
+  public Character getWeatherTargeted()
+  {
+    return weatherTargeted;
+  }
+
+  public void setWeatherTargeted(Character weatherTargeted)
+  {
+    this.weatherTargeted = weatherTargeted;
+  }
+
+  public String getValid()
+  {
+    return valid;
+  }
+
+  public void setValid(String valid)
+  {
+    this.valid = valid;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/resources/META-INF/properties.xml b/examples/csvformatter/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..ed2b5ce
--- /dev/null
+++ b/examples/csvformatter/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+  <!-- memory assigned to app master
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  -->
+  <property>
+    <name>dt.application.CustomOutputFormatter.operator.jsonParser.prop.sleepTime
+    </name>
+    <value>100</value>
+  </property>
+  <property>
+    <name>dt.application.CustomOutputFormatter.operator.jsonParser.port.out.attr.TUPLE_CLASS
+    </name>
+    <value>com.demo.myapexapp.PojoEvent</value>
+  </property>
+
+
+  <property>
+    <name>dt.application.CustomOutputFormatter.operator.HDFSOutputOperator.prop.filePath
+    </name>
+    <value>/tmp/formatterApp</value>
+  </property>
+
+  <property>
+    <name>dt.application.CustomOutputFormatter.operator.HDFSOutputOperator.prop.outFileName
+    </name>
+    <value>customFormatResult</value>
+  </property>
+
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/main/resources/schema.json
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/main/resources/schema.json b/examples/csvformatter/src/main/resources/schema.json
new file mode 100644
index 0000000..9209469
--- /dev/null
+++ b/examples/csvformatter/src/main/resources/schema.json
@@ -0,0 +1,60 @@
+{
+  "separator": "|",
+  "quoteChar": "\"",
+  "lineDelimiter": "\n",
+  "fields": [
+    {
+      "name": "campaignId",
+      "type": "Integer"
+    },
+    {
+      "name": "advId",
+      "type": "Integer"
+    },
+    {
+      "name": "campaignName",
+      "type": "String"
+    },
+    {
+      "name": "campaignBudget",
+      "type": "Double",
+      "constraints": {
+        "maxValue": 30000
+      }
+    },
+    {
+      "name": "startDate",
+      "type": "Date",
+      "constraints": {
+        "format": "yyyy-MM-dd"
+      }
+    },
+    {
+      "name": "endDate",
+      "type": "Date",
+      "constraints": {
+        "format": "dd/MM/yyyy"
+      }
+    },
+    {
+      "name": "securityCode",
+      "type": "String"
+    },
+    {
+      "name": "weatherTargeting",
+      "type": "Boolean"
+    },
+    {
+      "name": "optimized",
+      "type": "Boolean"
+    },
+    {
+      "name": "parentCampaign",
+      "type": "String"
+    },
+    {
+      "name": "weatherTargeted",
+      "type": "Character"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java b/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java
new file mode 100644
index 0000000..efe5946
--- /dev/null
+++ b/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java
@@ -0,0 +1,67 @@
+package com.demo.myapexapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import javax.validation.ConstraintViolationException;
+
+import org.apache.commons.io.FileUtils;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+
+  private static final String FILE_NAME = "/tmp/formatterApp";
+
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      FileUtils.deleteDirectory(new File(FILE_NAME));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      lma.prepareDAG(new Application(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      // wait for output files to roll
+      Thread.sleep(5000);
+
+      String[] extensions = {"dat.0", "tmp"};
+      Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false);
+
+      for (File file : list) {
+        for (String line : FileUtils.readLines(file)) {
+          Assert.assertEquals("Delimiter in record", true, (line.equals(
+            "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||")));
+        }
+      }
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/63df474b/examples/csvformatter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/csvformatter/src/test/resources/log4j.properties b/examples/csvformatter/src/test/resources/log4j.properties
new file mode 100644
index 0000000..98544e8
--- /dev/null
+++ b/examples/csvformatter/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
+log4j.logger.org=info